banditKing
banditKing

Reputation: 9579

Confluent kafka Python client Avro producer.producer() executes without error but no data in topic

My producer isnt throwing any errors but data is not being sent to the destination topic. Can you recommend any techniques to debug this situation.

I have call to a Confluent Python Avro Producer inside a synchronous loop to send data to a topic like so:

self.producer.produce(topic=test2, value=msg_dict)

After this call I have a piece of code like so to flush the queue:

num_messages_in_queue = self.producer.flush(timeout = 2.0)
print(f"flushed {num_messages_in_queue} messages from producer queue in iteration {num_iterations} ")

this executes without any error. But also there is no callback fired after this code executes. My producer is initiated as follows:

 def __init__(self,broker_url=None,topic=None,schema_registry_url=None,schema_path=None):
  try:
    with open(schema_path, 'r') as content_file:
      schema = avro.loads(content_file.read())
  except Exception as e:
    print(f"Error when trying to read avro schema file : {schema_path}")

  self.conf = {
    'bootstrap.servers': broker_url,
    'on_delivery': self.delivery_report,
    'schema.registry.url': schema_registry_url,
    'acks': -1, #This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. 
    'enable.idempotence': False, #
    "error_cb":self.error_cb
  }
  self.topic = topic
  self.schema_path = schema_path
  self.producer = AvroProducer(self.conf,default_key_schema=schema, default_value_schema=schema) 

My callback method is as follows:

def delivery_report(self, err, msg):
    print(f"began delivery_report")
    if err is None:
        print(f"delivery_report --> Delivered msg.value = {msg.value()} to topic= {msg.topic()} offset = {msg.offset} without err.")
    else:
        print(f"conf_worker AvroProducer failed to deliver message {msg.value()} to topic {self.topic}. got error= {err}") 

After this code is executed, I look at my topic on the schema registry container like so:

docker exec schema_registry_container kafka-avro-console-consumer --bootstrap-server kafka:29092 --topic test2 --from-beginning

I see this output:

[2020-04-03 15:48:38,064] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2020-04-03 15:48:38,742] INFO ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [kafka:29092] check.crcs = true client.dns.lookup = default client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = console-consumer-49056 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class >>org.apache.kafka.common.serialization.ByteArrayDeserializer (org.apache.kafka.clients.consumer.ConsumerConfig) [2020-04-03 15:48:38,887] INFO Kafka version : 2.1.0-cp1 (org.apache.kafka.common.utils.AppInfoParser) [2020-04-03 15:48:38,887] INFO Kafka commitId : bda8715f42a1a3db (org.apache.kafka.common.utils.AppInfoParser) [2020-04-03 15:48:39,221] INFO Cluster ID: KHKziPBvRKiozobbwvP1Fw (org.apache.kafka.clients.Metadata) [2020-04-03 15:48:39,224] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] Discovered group coordinator kafka:29092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2020-04-03 15:48:39,231] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2020-04-03 15:48:39,231] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] (Re-)joining group >(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2020-04-03 15:48:42,264] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2020-04-03 15:48:42,267] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] Setting newly assigned partitions [test2-0] >(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2020-04-03 15:48:42,293] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] Resetting offset for partition test2-0 to offset 0. >(org.apache.kafka.clients.consumer.internals.Fetcher)

Upvotes: 0

Views: 1838

Answers (1)

banditKing
banditKing

Reputation: 9579

So the answer is so trivial that its embarassing! But it does point to the fact that in a multilayered infrastructure, a single value incorrectly set, can result in a silent failure which can be very tedious to track down.

So the issue came from incorrect param setting my in my docker-compose.yml file, where the env variable for broker_url was not set. The application code needed this variable to reference the kafka broker. However there was no exception thrown for this missing param and it was silently failing.

Upvotes: 0

Related Questions