Reputation: 6005
I've have access to an Apache Kafka cluster and I've been given a file describing the Apache Avro serialisation format for messages. I'm writing a small test consumer in python and I'm getting the following error when trying to parse the schema:
SchemaParseException: Type property "{u'items': u'com.myapp.avromsg.common.MilestoneField', u'type': u'array'}" not a valid Avro schema: Items schema (com.myapp.avromsg.common.MilestoneField) not a valid Avro schema: Could not make an Avro Schema object from com.myapp.avromsg.common.MilestoneField. (known names: [u'com.myapp.avromsg.runstatus.RunStatusMessage'])
It looks to me like the error is coming from not knowing about the custom field type MilestoneField. How would I go about describing this field to my script so that the serialisation format will parse properly?
Here is the my_msg.avsc
avro file:
{
"type": "record",
"name": "RunStatusMessage",
"namespace": "com.myapp.avromsg.runstatus",
"fields": [
{
"name": "datasetID",
"type": "string"
},
{
"name": "runID",
"type": ["string", "null"]
},
{
"name": "registryRunID",
"type": ["string", "null"]
},
{
"name": "status",
"type": "string"
},
{
"name": "logs",
"type": ["string", "null"]
},
{
"name": "jobID",
"type": ["string", "null"]
},
{
"name": "validationsJson",
"type": ["string", "null"]
},
{
"name": "zone",
"type": "string"
},
{
"name": "milestoneFields",
"type": {
"type": "array",
"items": "com.myapp.avromsg.common.MilestoneField"
}
},
{
"name": "ingestionParams",
"type": {
"type": "array",
"items": "com.myapp.avromsg.common.MilestoneField"
},
"default": []
},
{
"name": "timestamp",
"type": [
{
"type": "long",
"logicalType": "timestamp-millis"
},
{
"type": "bytes",
"logicalType": "decimal",
"precision": 38,
"scale": 0
},
"string",
"int",
"null"
]
}
]
}
Here is the code I am using so far:
import avro.schema
schema = avro.schema.parse(open('my_msg.avsc', 'rb').read())
Upvotes: 0
Views: 2454
Reputation: 6005
Assuming I have avsc
files defining both my custom field and my message schema, here's how I can do this using python avro
import avro.schema
import json
schema_list = []
# First add the custom field to the schema list
custom_json = json.loads(open('custom_field.avsc', 'rb').read())
schema_list.append(custom_json)
# Then add the main message schema
main _json = json.loads(open('main _msg.avsc', 'rb').read())
schema_list.append(main _json)
# Convert the schema json to a JSON string
schema_json = json.dumps(schema_list)
# Parse the schema
full_msg_schema = avro.schema.parse(schema_json)
Upvotes: 1
Reputation: 7947
Not sure how to code it in pyhon, but I can provide the java version (My expectation it should be almost the same). You have two alternatives, include the definition of your MilestoneField
object as part of the schema (not clean at all if you are using it in multiple parts) or add extra types to the Schema.Parser
. In the example I hardcoded the schemas but the idea is the same reading from a File
public static void main(String [] args){
Schema.Parser parser = new Schema.Parser();
Schema pojo = new Schema.Parser().parse("{\n" +
" \"namespace\": \"io.fama.pubsub.schema\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"Pojo\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"field\",\n" +
" \"type\": \"string\"\n" +
" }\n" +
" ]\n" +
"}");
HashMap<String, Schema> extraTypes = new HashMap<>();
extraTypes.put("Pojo", pojo);
parser.addTypes(extraTypes);
Schema schema = parser.parse("{\n" +
" \"namespace\": \"io.fama.pubsub.schema\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"PojoCollection\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"pojosCollection\",\n" +
" \"type\": {\n" +
" \"type\": \"array\",\n" +
" \"items\": \"Pojo\"\n" +
" }\n" +
" }, {\n" +
" \"name\": \"additionaField\",\n" +
" \"type\": [\"null\", \"string\"]\n" +
" }\n" +
" ]\n" +
"}");
}
as you can see you can use the addTypes method to include additional custom objects in your schema. The method argument is a Map<string,Schema>
so you will need to parse your custom object schema before. Now if you have the class version of your schema (generated by avro), you should be able to add it like this
extraTypes.put("MilestoneField", MilestoneField.SCHEMA$);
Upvotes: 0