Eypros
Eypros

Reputation: 5723

How to post a kafka schema using python

I am trying to post a kafka schema using python.

From the CLI I would use a syntax like:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"VisualDetections\",\"namespace\":\"com.namespace.something\",\"fields\":[{\"name\":\"vehicle_id\",\"type\":\"int\"},{\"name\":\"source\",\"type\":\"string\"},{\"name\":\"width\",\"type\":\"int\"},{\"name\":\"height\",\"type\":\"int\"},{\"name\":\"annotated_frame\",\"type\":[\"string\",\"null\"]},{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"fps\",\"type\":\"int\"},{\"name\":\"mission_id\",\"type\":\"int\"},{\"name\":\"sequence\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"sequence_record\",\"fields\":[{\"name\":\"frame_id\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"localization\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"localization_record\",\"fields\":[{\"name\":\"latitude\",\"type\":\"double\"},{\"name\":\"longitude\",\"type\":\"double\"},{\"name\":\"class\",\"type\":\"string\"},{\"name\":\"object_id\",\"type\":\"int\"},{\"name\":\"confidence\",\"type\":\"double\"},{\"name\":\"bbox\",\"type\":{\"type\":\"record\",\"name\":\"bbox\",\"fields\":[{\"name\":\"x_min\",\"type\":\"int\"},{\"name\":\"y_min\",\"type\":\"int\"},{\"name\":\"x_max\",\"type\":\"int\"},{\"name\":\"y_max\",\"type\":\"int\"}]}}]}}}]}}}]}"}' http://server_ip:8081/subjects/VisualDetections-value/versions/

When I tried to tranfer this function to python I tried something like:

import requests
import json

topic = 'VisualDetections'
headers = {'Content-Type':  'application/vnd.schemaregistry.v1+json'}
with open(avro_path) as fp:
     data = {'schema': json.load(fp)}
data_json = json.dumps(data)
cmd = 'http://server_ip:8081/subjects/{}-value/versions/'.format(topic)
response = requests.post(cmd, headers=headers, data=data_json)

The above returns a code {"error_code":500,"message":"Internal Server Error"}. I have tried other options like:

with open(avro_path) as fp:
    data = json.load(fp)

with error code:

"error_code":422,"message":"Unrecognized field: name"
    

In the above the avro_path just contains the avro schema in a json file (can be uploaded if useful also).

I am not sure how I could post this data exactly. Also, I did not take into consideration the -H argument of post in CLI since I couldn't find a equivalent python argument (not sure it plays any role though). Can anyone provide a solution to this issue.

Upvotes: 0

Views: 1105

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191728

For the second error, the payload needs to be {'schema': "schema string"}

For the first, I think its a matter of the encoding; json.load will read the file to a dict rather than just a string.

Notice

>>> import json
>>> schema = {"type":"record"}  # example when using json.load() ... other data excluded
>>> json.dumps({'schema': schema})
'{"schema": {"type": "record"}}'  # the schema value is not a string
>>> json.dumps({'schema': json.dumps(schema)})
'{"schema": "{\\"type\\": \\"record\\"}"}'  # here it is

Try just reading the file

url = 'http://server_ip:8081/subjects/{}-value/versions/'.format(topic)
with open(avro_path) as fp:
     data = {'schema': fp.read().strip()}
     response = requests.post(cmd, headers=headers, data=json.dumps(data))

Otherwise, you would json.load then use json.dumps twice as shown above

You may also try json=data rather than data=json.dumps(data)

Upvotes: 4

Related Questions