Reputation: 65
I am new to AVRO and Python. I have a use case where I want to convert the JSON file into an Avro file. I have stored my schema in .avsc format and the JSON data in .json format. Now I want to put the JSON file and .avsc file together and serialize the JSON file into Avro. Below is my code, I am getting an error that "avro.io.AvroTypeException: The datum file.json is not an example of the schema". I am not sure what I am doing wrong. In my writer.append statement I want the script to take data from JSON file and append it in .avro file but in a serialized format. I am not sure how to handle this, any help will be highly appreciated.
Here is my code:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from avro import schema, datafile, io
import json
from avro import schema, datafile, io
def json_to_avro():
fo = open("laird.txt", "r")
data = fo.readlines()
final_header = []
final_rec = []
for header in data[0:1]:
header = header.strip("\n")
header = header.split(",")
final_header = header
for rec in data[1:]:
rec = rec.strip("\n")
rec = rec.split(" ")
rec = ' '.join(rec).split()
final_rec = rec
final_dict = dict(zip(final_header,final_rec))
# print(final_dict)
json_dumps = json.dumps(final_dict, ensure_ascii=False)
# print(json_dumps)
schema = avro.schema.parse(open("laird.avsc", "rb").read())
# print(schema)
writer = DataFileWriter(open("laird.avro", "wb"), DatumWriter(), schema)
with open("laird.json") as fp:
contents = json.load(fp)
print(contents)
writer.append(contents)
writer.close()
json_to_avro()
Data in laird.avsc file:
{
"name": "MyClass",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "event_type",
"type": "string"
},
{
"name": "event_data",
"type": {
"name": "event_data",
"type": "record",
"fields": [
{
"name": "device_id",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "payload",
"type": {
"type": "array",
"items": {
"name": "payload_record",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "sensor_id",
"type": "string"
},
{
"name": "type",
"type": "string"
},
{
"name": "unit",
"type": "string"
},
{
"name": "value",
"type": "float"
},
{
"name": "channel",
"type": "int"
},
{
"name": "timestamp",
"type": "long"
}
]
}
}
},
{
"name": "client_id",
"type": "string"
},
{
"name": "hardware_id",
"type": "string"
},
{
"name": "timestamp",
"type": "long"
},
{
"name": "application_id",
"type": "string"
},
{
"name": "device_type_id",
"type": "string"
}
]
}
},
{
"name": "company",
"type": {
"name": "company",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "address",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "country",
"type": "string"
},
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
{
"name": "industry",
"type": "string"
},
{
"name": "latitude",
"type": "float"
},
{
"name": "longitude",
"type": "float"
},
{
"name": "name",
"type": "string"
},
{
"name": "state",
"type": "string"
},
{
"name": "status",
"type": "int"
},
{
"name": "timezone",
"type": "string"
},
{
"name": "updated_at",
"type": "int",
"logicalType": "date"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "zip",
"type": "string"
}
]
}
},
{
"name": "location",
"type": {
"name": "location",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "address",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "country",
"type": "string"
},
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
{
"name": "industry",
"type": "string"
},
{
"name": "latitude",
"type": "float"
},
{
"name": "longitude",
"type": "float"
},
{
"name": "name",
"type": "string"
},
{
"name": "state",
"type": "string"
},
{
"name": "status",
"type": "int"
},
{
"name": "timezone",
"type": "string"
},
{
"name": "updated_at",
"type": "int",
"logicalType": "date"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "zip",
"type": "string"
},
{
"name": "company_id",
"type": "int"
}
]
}
},
{
"name": "device_type",
"type": {
"name": "device_type",
"type": "record",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "application_id",
"type": "string"
},
{
"name": "category",
"type": "string"
},
{
"name": "codec",
"type": "string"
},
{
"name": "data_type",
"type": "string"
},
{
"name": "description",
"type": "string"
},
{
"name": "manufacturer",
"type": "string"
},
{
"name": "model",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "parent_constraint",
"type": "string"
},
{
"name": "proxy_handler",
"type": "string"
},
{
"name": "subcategory",
"type": "string"
},
{
"name": "transport_protocol",
"type": "string"
},
{
"name": "version",
"type": "string"
},
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
{
"name": "updated_at",
"type": "int",
"logicalType": "date"
}
]
}
},
{
"name": "device",
"type": {
"name": "device",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "thing_name",
"type": "string"
},
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
{
"name": "updated_at",
"type": "int",
"logicalType": "date"
},
{
"name": "status",
"type": "int"
}
]
}
}
]
}
Data in laird.json
{
"event_type": "uplink",
"event_data": {
"device_id": "fec5d310-88bf-11ea-aadf-2bd85a1038fa",
"user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
"payload": [
{
"name": "Temperature",
"sensor_id": "fef87bd0-88bf-11ea-ae96-6fae9a4d5562",
"type": "temp",
"unit": "c",
"value": 21.9,
"channel": 3,
"timestamp": 1603300033446
},
{
"name": "Humidity",
"sensor_id": "fef399d0-88bf-11ea-a424-59b141c8d9bf",
"type": "rel_hum",
"unit": "p",
"value": 74,
"channel": 4,
"timestamp": 1603300033446
},
{
"name": "Battery",
"sensor_id": "feef2d00-88bf-11ea-aadf-2bd85a1038fa",
"type": "batt",
"unit": "p",
"value": 100,
"channel": 5,
"timestamp": 1603300033446
},
{
"name": "RSSI",
"sensor_id": "fef658f0-88bf-11ea-ae96-6fae9a4d5562",
"type": "rssi",
"unit": "dbm",
"value": -42,
"channel": 100,
"timestamp": 1603300033446
},
{
"name": "SNR",
"sensor_id": "",
"type": "snr",
"unit": "db",
"value": 10.2,
"channel": 101,
"timestamp": 1603300033446
}
],
"client_id": "1c46a2e0-88b9-11ea-90ae-77c0364d6e36",
"hardware_id": "0025ca0a00008612",
"timestamp": 1603300033446,
"application_id": "shipcomwireless",
"device_type_id": "16a4d0c0-3edf-11e9-bf5e-a180edbfa9bb"
},
"company": {
"id": 6854,
"address": "10500 University Center Dr",
"city": "Tampa",
"country": "United States",
"created_at": "2020-04-27T19:44:46Z",
"industry": "[\"Health Care\"]",
"latitude": 28.045853,
"longitude": -82.421,
"name": "Dermpath Diagnostics Bay Area",
"state": "FL",
"status": 0,
"timezone": "America/New_York",
"updated_at": "2020-04-27T19:44:46Z",
"user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
"zip": "33612"
},
"location": {
"id": 8138,
"address": "10500 University Center Dr",
"city": "Tampa",
"country": "United States",
"created_at": "2020-04-27T19:44:46Z",
"industry": "[\"Health Care\"]",
"latitude": 28.045853,
"longitude": -82.421,
"name": "Dermpath Diagnostics Bay Area",
"state": "FL",
"status": 0,
"timezone": "America/New_York",
"updated_at": "2020-10-19T18:22:19Z",
"user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
"zip": "33612",
"company_id": 6854
},
"device_type": {
"id": "16a4d0c0-3edf-11e9-bf5e-a180edbfa9bb",
"application_id": "",
"category": "module",
"codec": "lorawan.laird.rs1xx",
"data_type": "",
"description": "Temp Sensor",
"manufacturer": "Laird",
"model": "RS1xx",
"name": "Laird Temp & Humidity RS1xx Sensor - IoT in a Box",
"parent_constraint": "NOT_ALLOWED",
"proxy_handler": "PrometheusClient",
"subcategory": "lora",
"transport_protocol": "lorawan",
"version": "",
"created_at": "2019-03-05T00:39:09Z",
"updated_at": "2019-03-05T00:39:09Z"
},
"device": {
"id": 217200,
"thing_name": "Slide/Block Warehouse STE250",
"created_at": "2020-04-27T19:47:58Z",
"updated_at": "2020-04-28T12:58:08Z",
"status": 0
}
}
Upvotes: 1
Views: 5295
Reputation: 2074
UPDATE:
There are two problems. The first is in the schema. Anywhere that you currently have the following:
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
Should instead be this:
{
"name": "created_at",
"type": {"type": "int", "logicalType": "date"}
},
After fixing that, the other problem is in your data. When you read in the JSON file, the created_at
and updated_at
fields are strings, but they need to be datetime.date
objects. You can do this for each of them with something like the following:
from datetime import date
contents["company"]["created_at"] = date.fromisoformat(contents["company"]["created_at"].split("T")[0])
ORIGINAL:
The writer.append
function does not take in the name of a JSON file. Instead it takes in the JSON object. So instead of:
writer.append("laird.json")
You will want something like this:
with open("laird.json") as fp:
contents = json.load(fp)
writer.append(contents)
Upvotes: 0
Reputation: 31
you can try with fastavro and rec_avro module, here's some example
from fastavro import writer, reader, schema
from rec_avro import to_rec_avro_destructive, from_rec_avro_destructive, rec_avro_schema
def json_objects():
return [{'a': 'a'}, {'b':'b'}]
# For efficiency, to_rec_avro_destructive() destroys rec, and reuses it's
# data structures to construct avro_objects
avroObjects = (to_rec_avro_destructive(rec) for rec in json_objects())
# store records in avro
with open('json_in_avro.avro', 'wb') as x:
writer(x, schema.parse_schema(rec_avro_schema()), avroObjects)
#load records from avro
with open('json_in_avro.avro', 'rb') as f_in:
# For efficiency, from_rec_avro_destructive(rec) destroys rec, and
# reuses it's data structures to construct it's output
loadedJson = [from_rec_avro_destructive(rec) for rec in reader(f_in)]
assert loadedJson == json_objects()
Upvotes: 1