Reputation: 293
i'm trying to load some tweets via TWINT into a kafka topic with the confluent-kafka[avro] Producer. I don't get any errors but my topic wont receive any events from twint. I even get succes msg, when debugging(with try and except).
My Code:
import twint
import sys
import json
from time import sleep
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
# Define Avro schema
value_schema_str = """
{
"namespace": "my.test",
"name": "value",
"type": "record",
"fields" : [
{ "name": "id", "type": "long" },
{ "name": "tweet", "type": "string" },
{ "name": "datetime", "type": "string" },
{ "name": "username", "type": "string" },
{ "name": "user_id", "type": "long" },
{ "name": "name", "type": "string" }
]
}
"""
key_schema_str = """
{
"namespace": "my.test",
"name": "key",
"type": "record",
"fields" : [
{
"name" : "name",
"type" : "string"
}
]
}
"""
kafka_broker = 'host.docker.internal:9092'
schema_registry = 'http://host.docker.internal:8081'
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
producer = AvroProducer({
'bootstrap.servers': kafka_broker,
'schema.registry.url': schema_registry
}, default_key_schema=key_schema, default_value_schema=value_schema)
module = sys.modules["twint.storage.write"]
def Json(obj, config):
tweet = obj.__dict__
tweet_new = {}
tweet_new['id'] = tweet['id']
tweet_new['tweet'] = tweet['tweet']
tweet_new['datetime'] = tweet['datetime']
tweet_new['username'] = tweet['username']
tweet_new['user_id'] = tweet['user_id']
tweet_new['name'] = tweet['name']
print(tweet_new)
try:
producer.produce(topic='tweets_test', key={"name": "Key"}, value=tweet_new)
except Exception as e:
print(f"Exception while producing record value - {tweet_new} to topic - tweets_test: {e}")
else:
print(f"Successfully producing record value - {tweet_new} to topic - tweets_test")
try:
producer.flush()
except Exception as e:
print(f"Exception while flush record value - {tweet_new} to topic - tweets_test: {e}")
else:
print(f"Successfully flushed record value - {tweet_new} to topic - tweets_test")
module.Json = Json
c2 = twint.Config()
c2.Search = "corona OR regen OR \"stark regen\" OR \"sturm\" OR überschwemmung OR landunter OR @hagel OR @regen OR @sturm OR flut"
c2.Store_json = True
c2.Custom["user"] = ["id", "tweet", "user_id", "username", "hashtags"]
c2.User_full = True
c2.Output = "tweets.json"
c2.Since = '2019-05-20'
c2.Hide_output = True
twint.run.Search(c2)
When i run it i get the following output:
{'id': 1513818741057937408, 'tweet': 'RKI: Bundesweite Sieben-Tage-Inzidenz steigt leicht auf 1087 | #corona #rki #test ', 'datetime': '2022-04-12 09:58:07 UTC', 'username': 'flashupde', 'user_id': 1179376986516606978, 'name': 'FLASH UP'}
Successfully producing record value - {'id': 1513818741057937408, 'tweet': 'RKI: Bundesweite Sieben-Tage-Inzidenz steigt leicht auf 1087 | #corona #rki #test ', 'datetime': '2022-04-12 09:58:07 UTC', 'username': 'flashupde', 'user_id': 1179376986516606978, 'name': 'FLASH UP'} to topic - tweets_test
Successfully flushed record value - {'id': 1513818741057937408, 'tweet': 'RKI: Bundesweite Sieben-Tage-Inzidenz steigt leicht auf 1087 | #corona #rki #test ', 'datetime': '2022-04-12 09:58:07 UTC', 'username': 'flashupde', 'user_id': 1179376986516606978, 'name': 'FLASH UP'} to topic - tweets_test
Any help how i can debug it better or any advice would be awesome.
Upvotes: 0
Views: 97