pete
pete

Reputation: 1

Key value in apache beam

My output in apache beam is (['key'],{'id':name})

Expected ('key',{'id':name})

How to do transformation using Map in apache beam to get the expected output

Upvotes: 0

Views: 828

Answers (2)

abhish_gl
abhish_gl

Reputation: 401

You can use MapElements to transform from you input map to desired output. For example, sample code below does this transformation (see inline code comments for details)

        // Create the pipeline
        Pipeline p = Pipeline.create();

        // Example input data
        List<KV<List<String>, Map<String, String>>> inputData = Arrays.asList(
                KV.of(Arrays.asList("key1"), new HashMap<String, String>() {{
                    put("id", "name1");
                }}),
                KV.of(Arrays.asList("key2"), new HashMap<String, String>() {{
                    put("id", "name2");
                }})
        );

        // Create a PCollection from the input data
        p.apply("CreateInput", Create.of(inputData))
         // Apply the transformation using MapElements
         .apply("TransformElements", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.maps(TypeDescriptors.strings(), TypeDescriptors.strings())))
                 .via((KV<List<String>, Map<String, String>> element) -> {
                     String key = element.getKey().get(0); // Extract the single key from the list
                     Map<String, String> value = element.getValue();
                     return KV.of(key, value);
                 }))
         // Print the output (for demonstration purposes)
         .apply("PrintOutput", ParDo.of(new PrintElements()));

        // Run the pipeline
        p.run().waitUntilFinish();

Upvotes: 0

Peter McArthur
Peter McArthur

Reputation: 63

Here is a test pipeline with a lambda function that reformats your tuple using Map:

options = PipelineOptions()
options.view_as(StandardOptions).streaming = True

with TestPipeline(options=options) as p:
    
    bad_record = (['key'],{'id':'name'})

    records = (
        p 
        | beam.Create([bad_record])
        | beam.Map(lambda e: (e[0][0], e[1]))
        | beam.Map(print)
    )

The output is:

('key', {'id': 'name'})

I'm guessing that your [key] is probably being yielded instead of returned in a Map earlier in the pipeline. If you fix it there you won't need this step.

Upvotes: 2

Related Questions