Reputation: 1
My output in apache beam is (['key'],{'id':name})
Expected ('key',{'id':name})
How to do transformation using Map in apache beam to get the expected output
Upvotes: 0
Views: 828
Reputation: 401
You can use MapElements to transform from you input map to desired output. For example, sample code below does this transformation (see inline code comments for details)
// Create the pipeline
Pipeline p = Pipeline.create();
// Example input data
List<KV<List<String>, Map<String, String>>> inputData = Arrays.asList(
KV.of(Arrays.asList("key1"), new HashMap<String, String>() {{
put("id", "name1");
}}),
KV.of(Arrays.asList("key2"), new HashMap<String, String>() {{
put("id", "name2");
}})
);
// Create a PCollection from the input data
p.apply("CreateInput", Create.of(inputData))
// Apply the transformation using MapElements
.apply("TransformElements", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.maps(TypeDescriptors.strings(), TypeDescriptors.strings())))
.via((KV<List<String>, Map<String, String>> element) -> {
String key = element.getKey().get(0); // Extract the single key from the list
Map<String, String> value = element.getValue();
return KV.of(key, value);
}))
// Print the output (for demonstration purposes)
.apply("PrintOutput", ParDo.of(new PrintElements()));
// Run the pipeline
p.run().waitUntilFinish();
Upvotes: 0
Reputation: 63
Here is a test pipeline with a lambda function that reformats your tuple using Map:
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
bad_record = (['key'],{'id':'name'})
records = (
p
| beam.Create([bad_record])
| beam.Map(lambda e: (e[0][0], e[1]))
| beam.Map(print)
)
The output is:
('key', {'id': 'name'})
I'm guessing that your [key] is probably being yielded instead of returned in a Map earlier in the pipeline. If you fix it there you won't need this step.
Upvotes: 2