I can't process the element that comes from external API call

I am trying to do an external call to the Google Natural Language API in a DoFn class:

class RequestAPI(beam.DoFn):
    def setup(self):
        self._client = language_v1.LanguageServiceClient()

    def process(self, element):
        classify = False
        if element['lang'] =='en':
            classify = True
        document = {"document": {
            "type_": "PLAIN_TEXT", "content": element['text']},
            "features": {
                "extract_entities": True,
                "extract_document_sentiment": True,
                "extract_entity_sentiment": True,
                "classify_text": classify,
            },
            "encoding_type": "UTF8"
        }

        response = self._client.annotate_text(request=document)
        new_element = element
        new_element['analysis'] = {'sentences': response.sentences,
                                   'score': response.document_sentiment.score,
                                   'magnitude': response.document_sentiment.magnitude,
                                   'tokens': response.tokens,
                                   'lang_api': response.language,
                                   'entities': response.entities
                                   }
        print(new_element)
        return new_element

And, I call this class from:

data_enrich: PCollection = (
            tweets_data | beam.ParDo(RequestAPI())
            | 'write data' >> beam.io.WriteToText('data.txt')
    )

The problem is that when I recieve the response and return the new element I get the following error: TypeError: cannot pickle 'google.protobuf.pyext._message.RepeatedCompositeContainer' object [while running 'ParDo(RequestAPI)']

I already check that the ptoblem is not the API response or the type of the new element (is a dict).

I am using python 3.7 and the apache beam version 2.43.

I hope that you can help me or give me any clue about what is happenig.

I try it before with a Map transform and it give it to me the same error, and I saw a few examples of external call with apache beam and the examples use DoFn classes, so I tried it but it didn't work. It gives me the same error.

Upvotes: 0

Views: 285

Answers (1)

CaptainNabla
CaptainNabla

Reputation: 1166

The output of a DoFn needs to be an iterator. According to the documentation (chapter 4.2.1.2)

Your process method should accept an argument element, which is the input element, and return an iterable with its output values. You can accomplish this by emitting individual elements with yield statements. You can also use a return statement with an iterable, like a list or a generator.

So you should change the output of RequestAPI to

return [new_element]

or even better

yield new_element

Upvotes: 0

Related Questions