Reputation: 573
I am receiving the result of an API call, make some transformations, and store it in S3, now it stores 1 file for each api call. Resulting in a LOT of files, the flow is:
invokeHTTP->Split.json->JoltTransformJSON (I don't need all the data)->EvaluateJsonPath->InferAvroScheme (500 samples)->ConvertJSONToAvro->PutS3Object
The json format is:
"data": {"value1": "test", "value2": "test2"},
"actions": [{"buy": 5, "sell": 6},{"buyAgain": 5, "sellAgain": 6}],
"Reactions": [{"buy": 5, "sell": 6}],
{"otherValue": "1",
"otherValue2": "2"}
sometimes actions have values inside, in other casos "actions":[] I drop Reactions using JoltTransformJSON with remove parameter, it have a LOT of data I don't need
To join the values I tried MergeContent, but it DROPs a lot of records, first I read the possible configurations, then...I start modifying parameters just to see how it changes the output, It always DROP a lot of records.
So now I'm storing 1 file per json in S3, thats a lot of files and you can feel it when querying the data.
How can I improve the flow to store less files? Thank you!
---- EDIT: image added ----
Current MergeContents configuration, don't quite understand Attribute strategy Property. Can this fix the changes in schema? (actions with value or "actions":[])
---- EDIT 2 ---- Now I can confirm that is grouping by state as expected but dropping the JSON flows that have "actions" : [], they have the same state as some of the flows with that field full, any ideas? thanks!
Upvotes: 1
Views: 432
Reputation: 70406
The MergeContent processor is the correct solution here. Set merge format
to Avro
and the Avro contents of a flowfile will be concatenated together into a single flowfile. Your problem of dropped data is related to the metadata strategy property:
For FlowFiles whose input format supports metadata (Avro, e.g.), this property determines which metadata should be added to the bundle. If 'Use First Metadata' is selected, the metadata keys/values from the first FlowFile to be bundled will be used. If 'Keep Only Common Metadata' is selected, only the metadata that exists on all FlowFiles in the bundle, with the same value, will be preserved. If 'Ignore Metadata' is selected, no metadata is transferred to the outgoing bundled FlowFile. If 'Do Not Merge Uncommon Metadata' is selected, any FlowFile whose metadata values do not match those of the first bundled FlowFile will not be merged.
Flowfiles, which schema is not equal to the schema of the first bundled flowfile, will be dropped. I can think of two possible solutions to prevent that:
Correlation Attribute Name
to merge Avro flowfiles that share the same schemaYou have to ensure, that only files get merged that have the same schema. So if you can put some attribute on the flowfile like "type=CAR or type=BIKE" you can set Correlation Attribute to "type". MergeContent will then make bundles based on type. Since the schema of the files in a bundle is the same, no records will be dropped.
Replace InferAvroSchema
and ConvertJsonToAvro
with a single processor: ConvertRecord. Configure a JsonTreeReader
as reader and leave the default properties. Configure a AvroRecordSetWriter
as writer and set following properties:
In the AvroRecordSetWriter
configure following Schema text
:
{
"name": "MyClass",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "data",
"type": {
"name": "data",
"type": "record",
"fields": [
{
"name": "value1",
"type": "string"
},
{
"name": "value2",
"type": "string"
}
]
}
},
{
"name": "actions",
"type": {
"type": "array",
"items": {
"name": "actions_record",
"type": "record",
"fields": [
{
"name": "buyAgain",
"type": ["int", "null"]
},
{
"name": "sellAgain",
"type": ["int", "null"]
},
{
"name": "buy",
"type": ["int", "null"]
},
{
"name": "sell",
"type": ["int", "null"]
}
]
}
}
},
{
"name": "Reactions",
"type": {
"type": "array",
"items": {
"name": "Reactions_record",
"type": "record",
"fields": [
{
"name": "buy",
"type": "int"
},
{
"name": "sell",
"type": "int"
}
]
}
}
}
]
}
Notice that actions now includes all the fields. If you need help to convert Json to an Avro schema use this schema generator.
PS: if you need more information on how to control the number of records per merge please click here.
Upvotes: 1