Reputation: 504
The consumer bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_topic
works and I can see the logs as: 2021-12-11T22:40:13.800Z {"ts":1639262395.220755,"uid":"CiaUp427FXwzqySsOh","id.orig_h":"fe80::f816:3eff:fef4:a877","id.orig_p":5353,"id.resp_h":"ff02::fb","id.resp_p":5353,"proto":"udp","service":"dns","duration":0.40987586975097659,"orig_bytes":1437,"resp_bytes":0,"conn_state":"S0","missed_bytes":0,"history":"D","orig_pkts":4,"orig_ip_bytes":1629,"resp_pkts":0,"resp_ip_bytes":0}
However with the consumer code listed below I am getting: JSON Decoder Error: Extra data line 1 column 4 char 4"
which seems to be an easy error related to parsing the data, which has each log starting with the date:time as shown above. Meaning the consumer gets the first log but cannot parse it.
Easy enough, yet it seems I cannot get around it as this is part of the KafkaConsumer object. If anyone can give a hint or show how to do it it would be great. Thanks and Regards, M
from json import loads
from kafka import KafkaConsumer, TopicPartition
import threading, time
from IPython.display import clear_output
KAFKA_SERVER='10.10.10.10:9092'
TOPIC = 'my_topic'
AUTO_OFFSET_RESET = 'earliest'
CONSUMER_TIME_OUT=1000 #miliseconds
MAXIMUM_SECONDS=0.01 #seconds
class TrafficConsumer():
def __init__(self, offset=AUTO_OFFSET_RESET, verbose=False, close=True):
try:
self.__traffic_consumer = KafkaConsumer(
TOPIC,
bootstrap_servers = [KAFKA_SERVER],
auto_offset_reset = offset,
enable_auto_commit = True,
#group_id = GROUP_ID,
value_deserializer = lambda x : loads(x.decode('utf-8')),
consumer_timeout_ms = CONSUMER_TIME_OUT,
#on_commit = self.commit_completed(),
)
self.__traffic_consumer.subscribe([TOPIC])
threading.Thread.__init__(self)
self.stop_event = threading.Event()
except Exception as e:
print("Consumer is not accessible. Check: the connections and the settings in attributes_kafka.", e)
self.set_conn_log_traffic(verbose=verbose, close=close)
def stop(self):
self.stop_event.set()
def get_consumer(self):
return self.__traffic_consumer
def set_conn_log_traffic(self, verbose=False, close=True):
while not self.stop_event.is_set():
for ind_flow in self.__traffic_consumer.poll(2):
print(ind_flow)
if self.stop_event.is_set():
break
if close: self.__traffic_consumer.close()
Upvotes: 0
Views: 747
Reputation: 191728
Your data isn't proper json. It includes a timestamp before the json object, which cannot be decoded using json.loads
.
You should verify how the producer is sending data since the timestamp is part of the value, rather than the Kafka record timestamp
Or, you can handle the problem in a the consumer by using a different deserializer function
For example
def safe_deserialize(value):
_, data = value.decode('utf-8').split(" ", 1)
return json.loads(data)
...
KafkaConsumer(
...
value_deserializer = safe_deserialize,
Upvotes: 1