Reputation: 7280
I'm using kafka kafka_2.11-0.11.0.2 and confluent version 3.3.0 for schema registry.
I have defined an avro schema as follows:
{
"namespace": "com.myntra.search",
"type": "record",
"name": "SearchDataIngestionObject",
"fields": [
{"name": "timestamp","type":"long"},
{"name": "brandList", "type":{ "type" : "array", "items" : "string" }},
{"name": "articleTypeList", "type":{ "type" : "array", "items" : "string" }},
{"name": "gender", "type":{ "type" : "array", "items" : "string" }},
{"name": "masterCategoryList", "type":{ "type" : "array", "items" : "string" }},
{"name": "subCategoryList", "type":{ "type" : "array", "items" : "string" }},
{"name": "quAlgo","type":{ "type" : "array", "items" : "string" }},
{"name": "colours", "type":{ "type" : "array", "items" : "string" }},
{"name": "isLandingPage", "type": "boolean"},
{"name": "isUserQuery", "type": "boolean"},
{"name": "isAutoSuggest", "type": "boolean"},
{"name": "userQuery", "type": "string"},
{"name": "correctedQuery", "type": "string"},
{"name": "completeSolrQuery", "type": "string"},
{"name": "atsaList", "type":{"type": "map", "values":{ "type" : "array", "items" : "string" }}},
{"name": "quMeta", "type": {"type": "map", "values": "string"}},
{"name": "requestId", "type": "string"}
]
}
And I'm trying to write some data to kafka as follows:
value = {
"timestamp": 1597399323000,
"brandList": ["brand_value"],
"articleTypeList": ["articleType_value"],
"gender": ["gender_value"],
"masterCategoryList": ["masterCategory_value"],
"subCategoryList": ["subCategory_value"],
"quAlgo": ["quAlgo_value"],
"colours": ["colours_value"],
"isLandingPage": False,
"isUserQuery": False,
"isAutoSuggest": False,
"userQuery": "userQuery_value",
"correctedQuery": "correctedQuery_value",
"completeSolrQuery": "completeSolrQuery_value",
"atsaList": {
"atsa_key1": ["atsa_value1"],
"atsa_key2": ["atsa_value2"],
"atsa_key3": ["atsa_value3"]
},
"quMeta": {
"quMeta_key1": "quMeta_value1",
"quMeta_key2": "quMeta_value2",
"quMeta_key3": "quMeta_value3"
},
"requestId": "requestId_value"
}
topic = "search"
key = str(uuid.uuid4())
producer.produce(topic=topic, key=key, value=value)
producer.flush()
But I'm getting the following error:
Traceback (most recent call last):
File "producer.py", line 61, in <module>
producer.produce(topic=topic, key=key, value=value)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/__init__.py", line 99, in produce
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 118, in encode_record_with_schema
return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 152, in encode_record_with_schema_id
writer(record, outf)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 86, in <lambda>
return lambda record, fp: writer.write(record, avro.io.BinaryEncoder(fp))
File "/Library/Python/2.7/site-packages/avro/io.py", line 979, in write
raise AvroTypeException(self.writers_schema, datum)
avro.io.AvroTypeException: The datum {'quAlgo': ['quAlgo_value'], 'userQuery': 'userQuery_value', 'isAutoSuggest': False, 'isLandingPage': False, 'timestamp': 1597399323000, 'articleTypeList': ['articleType_value'], 'colours': ['colours_value'], 'correctedQuery': 'correctedQuery_value', 'quMeta': {'quMeta_key1': 'quMeta_value1', 'quMeta_key2': 'quMeta_value2', 'quMeta_key3': 'quMeta_value3'}, 'requestId': 'requestId_value', 'gender': ['gender_value'], 'isUserQuery': False, 'brandList': ['brand_value'], 'masterCategoryList': ['masterCategory_value'], 'subCategoryList': ['subCategory_value'], 'completeSolrQuery': 'completeSolrQuery_value', 'atsaList': {'atsa_key1': ['atsa_value1'], 'atsa_key2': ['atsa_value2'], 'atsa_key3': ['atsa_value3']}} is not an example of the schema {
"namespace": "com.myntra.search",
"type": "record",
"name": "SearchDataIngestionObject",
"fields": [
{
"type": "long",
"name": "timestamp"
},
{
"type": {
"items": "string",
"type": "array"
},
"name": "brandList"
},
{
"type": {
"items": "string",
"type": "array"
},
"name": "articleTypeList"
},
{
"type": {
"items": "string",
"type": "array"
},
"name": "gender"
},
{
"type": {
"items": "string",
"type": "array"
},
"name": "masterCategoryList"
},
{
"type": {
"items": "string",
"type": "array"
},
"name": "subCategoryList"
},
{
"type": {
"items": "string",
"type": "array"
},
"name": "quAlgo"
},
{
"type": {
"items": "string",
"type": "array"
},
"name": "colours"
},
{
"type": "boolean",
"name": "isLandingPage"
},
{
"type": "boolean",
"name": "isUserQuery"
},
{
"type": "boolean",
"name": "isAutoSuggest"
},
{
"type": "string",
"name": "userQuery"
},
{
"type": "string",
"name": "correctedQuery"
},
{
"type": "string",
"name": "completeSolrQuery"
},
{
"type": {
"values": {
"items": "string",
"type": "array"
},
"type": "map"
},
"name": "atsaList"
},
{
"type": {
"values": "string",
"type": "map"
},
"name": "quMeta"
},
{
"type": "string",
"name": "requestId"
}
]
}
I even trying the same example as given here but it doesn't work and throws the same error.
Upvotes: 1
Views: 895
Reputation: 2074
In your exception, there error is saying that the data you are providing it is the following:
{'userQuery': 'userQuery_value',
'isAutoSuggest': False,
'isLandingPage': False,
'correctedQuery': 'correctedQuery_value',
'isUserQuery': False,
'timestamp': 1597399323000,
'completeSolrQuery': 'completeSolrQuery_value',
'requestId': 'requestId_value'}
This is much less than what you claim you are providing it in your example.
Can you go back to your original code and on line 60 before you do producer.produce(topic=topic, key=key, value=value)
just do a simple print(value)
to make sure you are sending it the right value and that the value
hasn't gotten overwritten by some other line of code.
Upvotes: 1