Mihai.Mehe
Mihai.Mehe

Reputation: 504

Apache Kafka setup consumer error: "JSON Decoder Error: Extra data line 1 column 4 char 4"

The consumer bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_topic works and I can see the logs as: 2021-12-11T22:40:13.800Z {"ts":1639262395.220755,"uid":"CiaUp427FXwzqySsOh","id.orig_h":"fe80::f816:3eff:fef4:a877","id.orig_p":5353,"id.resp_h":"ff02::fb","id.resp_p":5353,"proto":"udp","service":"dns","duration":0.40987586975097659,"orig_bytes":1437,"resp_bytes":0,"conn_state":"S0","missed_bytes":0,"history":"D","orig_pkts":4,"orig_ip_bytes":1629,"resp_pkts":0,"resp_ip_bytes":0}

However with the consumer code listed below I am getting: JSON Decoder Error: Extra data line 1 column 4 char 4" which seems to be an easy error related to parsing the data, which has each log starting with the date:time as shown above. Meaning the consumer gets the first log but cannot parse it.

Easy enough, yet it seems I cannot get around it as this is part of the KafkaConsumer object. If anyone can give a hint or show how to do it it would be great. Thanks and Regards, M

from json import loads  
from kafka import KafkaConsumer, TopicPartition 
import threading, time
from IPython.display import clear_output


KAFKA_SERVER='10.10.10.10:9092'
TOPIC = 'my_topic'
AUTO_OFFSET_RESET = 'earliest'
CONSUMER_TIME_OUT=1000 #miliseconds
MAXIMUM_SECONDS=0.01 #seconds

class TrafficConsumer():
    def __init__(self, offset=AUTO_OFFSET_RESET, verbose=False, close=True):
        try:
            self.__traffic_consumer = KafkaConsumer(  
                 TOPIC,  
                 bootstrap_servers = [KAFKA_SERVER],  
                 auto_offset_reset = offset,  
                 enable_auto_commit = True,  
                 #group_id = GROUP_ID,  
                 value_deserializer = lambda x : loads(x.decode('utf-8')),
                 consumer_timeout_ms = CONSUMER_TIME_OUT,
                #on_commit = self.commit_completed(),
                 )
            self.__traffic_consumer.subscribe([TOPIC])
            threading.Thread.__init__(self)
            self.stop_event = threading.Event()
        except Exception as e:
            print("Consumer is not accessible. Check: the connections and the settings in attributes_kafka.", e)
        self.set_conn_log_traffic(verbose=verbose, close=close)
        
    def stop(self):
        self.stop_event.set()
        
    def get_consumer(self):
        return self.__traffic_consumer
    
    def set_conn_log_traffic(self, verbose=False, close=True):
        while not self.stop_event.is_set():
            for ind_flow in self.__traffic_consumer.poll(2):
                print(ind_flow)
                
                if self.stop_event.is_set():
                    break        
        if close: self.__traffic_consumer.close()
  

Upvotes: 0

Views: 747

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191728

Your data isn't proper json. It includes a timestamp before the json object, which cannot be decoded using json.loads.

You should verify how the producer is sending data since the timestamp is part of the value, rather than the Kafka record timestamp

Or, you can handle the problem in a the consumer by using a different deserializer function

For example

def safe_deserialize(value):
  _, data = value.decode('utf-8').split(" ", 1)
  return json.loads(data)

...

 KafkaConsumer(
   ...
   value_deserializer = safe_deserialize,

Upvotes: 1

Related Questions