nish
nish

Reputation: 7280

Unable to write avro data to kafka using python

I'm using kafka kafka_2.11-0.11.0.2 and confluent version 3.3.0 for schema registry.

I have defined an avro schema as follows:

{
"namespace": "com.myntra.search",
"type": "record",
"name": "SearchDataIngestionObject",
"fields": [
  {"name": "timestamp","type":"long"},
  {"name": "brandList", "type":{ "type" : "array", "items" : "string" }},
  {"name": "articleTypeList", "type":{ "type" : "array", "items" : "string" }},
  {"name": "gender", "type":{ "type" : "array", "items" : "string" }},
  {"name": "masterCategoryList", "type":{ "type" : "array", "items" : "string" }},
  {"name": "subCategoryList", "type":{ "type" : "array", "items" : "string" }},
  {"name": "quAlgo","type":{ "type" : "array", "items" : "string" }},
  {"name": "colours", "type":{ "type" : "array", "items" : "string" }},
  {"name": "isLandingPage", "type": "boolean"},
  {"name": "isUserQuery", "type": "boolean"},
  {"name": "isAutoSuggest", "type": "boolean"},
  {"name": "userQuery", "type": "string"},
  {"name": "correctedQuery", "type": "string"},
  {"name": "completeSolrQuery", "type": "string"},
  {"name": "atsaList", "type":{"type": "map", "values":{ "type" : "array", "items" : "string" }}},
  {"name": "quMeta", "type": {"type": "map", "values": "string"}},
  {"name": "requestId", "type": "string"}
]

}

And I'm trying to write some data to kafka as follows:

value = {
    "timestamp": 1597399323000,
    "brandList": ["brand_value"],
    "articleTypeList": ["articleType_value"],
    "gender": ["gender_value"],
    "masterCategoryList": ["masterCategory_value"],
    "subCategoryList": ["subCategory_value"],
    "quAlgo": ["quAlgo_value"],
    "colours": ["colours_value"],
    "isLandingPage": False,
    "isUserQuery": False,
    "isAutoSuggest": False,
    "userQuery": "userQuery_value",
    "correctedQuery": "correctedQuery_value",
    "completeSolrQuery": "completeSolrQuery_value",
    "atsaList": {
        "atsa_key1": ["atsa_value1"],
        "atsa_key2": ["atsa_value2"],
        "atsa_key3": ["atsa_value3"]
    },
    "quMeta": {
        "quMeta_key1": "quMeta_value1",
        "quMeta_key2": "quMeta_value2",
        "quMeta_key3": "quMeta_value3"
    },
    "requestId": "requestId_value"
}

topic = "search"
key = str(uuid.uuid4())

producer.produce(topic=topic, key=key, value=value)
producer.flush()

But I'm getting the following error:

Traceback (most recent call last):
File "producer.py", line 61, in <module>
  producer.produce(topic=topic, key=key, value=value)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/__init__.py", line 99, in produce
  value = self._serializer.encode_record_with_schema(topic, value_schema, value)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 118, in encode_record_with_schema
  return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 152, in encode_record_with_schema_id
  writer(record, outf)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 86, in <lambda>
  return lambda record, fp: writer.write(record, avro.io.BinaryEncoder(fp))
File "/Library/Python/2.7/site-packages/avro/io.py", line 979, in write
  raise AvroTypeException(self.writers_schema, datum)
avro.io.AvroTypeException: The datum {'quAlgo': ['quAlgo_value'], 'userQuery': 'userQuery_value', 'isAutoSuggest': False, 'isLandingPage': False, 'timestamp': 1597399323000, 'articleTypeList': ['articleType_value'], 'colours': ['colours_value'], 'correctedQuery': 'correctedQuery_value', 'quMeta': {'quMeta_key1': 'quMeta_value1', 'quMeta_key2': 'quMeta_value2', 'quMeta_key3': 'quMeta_value3'}, 'requestId': 'requestId_value', 'gender': ['gender_value'], 'isUserQuery': False, 'brandList': ['brand_value'], 'masterCategoryList': ['masterCategory_value'], 'subCategoryList': ['subCategory_value'], 'completeSolrQuery': 'completeSolrQuery_value', 'atsaList': {'atsa_key1': ['atsa_value1'], 'atsa_key2': ['atsa_value2'], 'atsa_key3': ['atsa_value3']}} is not an example of the schema {
"namespace": "com.myntra.search", 
"type": "record", 
"name": "SearchDataIngestionObject", 
"fields": [
  {
    "type": "long", 
    "name": "timestamp"
  }, 
  {
    "type": {
      "items": "string", 
      "type": "array"
    }, 
    "name": "brandList"
  }, 
  {
    "type": {
      "items": "string", 
      "type": "array"
    }, 
    "name": "articleTypeList"
  }, 
  {
    "type": {
      "items": "string", 
      "type": "array"
    }, 
    "name": "gender"
  }, 
  {
    "type": {
      "items": "string", 
      "type": "array"
    }, 
    "name": "masterCategoryList"
  }, 
  {
    "type": {
      "items": "string", 
      "type": "array"
    }, 
    "name": "subCategoryList"
  }, 
  {
    "type": {
      "items": "string", 
      "type": "array"
    }, 
    "name": "quAlgo"
  }, 
  {
    "type": {
      "items": "string", 
      "type": "array"
    }, 
    "name": "colours"
  }, 
  {
    "type": "boolean", 
    "name": "isLandingPage"
  }, 
  {
    "type": "boolean", 
    "name": "isUserQuery"
  }, 
  {
    "type": "boolean", 
    "name": "isAutoSuggest"
  }, 
  {
    "type": "string", 
    "name": "userQuery"
  }, 
  {
    "type": "string", 
    "name": "correctedQuery"
  }, 
  {
    "type": "string", 
    "name": "completeSolrQuery"
  }, 
  {
    "type": {
      "values": {
        "items": "string", 
        "type": "array"
      }, 
      "type": "map"
    }, 
    "name": "atsaList"
  }, 
  {
    "type": {
      "values": "string", 
      "type": "map"
    }, 
    "name": "quMeta"
  }, 
  {
    "type": "string", 
    "name": "requestId"
  }
]
}

I even trying the same example as given here but it doesn't work and throws the same error.

Upvotes: 1

Views: 895

Answers (1)

Scott
Scott

Reputation: 2074

In your exception, there error is saying that the data you are providing it is the following:

{'userQuery': 'userQuery_value',
 'isAutoSuggest': False,
 'isLandingPage': False,
 'correctedQuery': 'correctedQuery_value',
 'isUserQuery': False,
 'timestamp': 1597399323000,
 'completeSolrQuery': 'completeSolrQuery_value',
 'requestId': 'requestId_value'}

This is much less than what you claim you are providing it in your example.

Can you go back to your original code and on line 60 before you do producer.produce(topic=topic, key=key, value=value) just do a simple print(value) to make sure you are sending it the right value and that the value hasn't gotten overwritten by some other line of code.

Upvotes: 1

Related Questions