Reputation: 1
I am trying to do an external call to the Google Natural Language API in a DoFn class:
class RequestAPI(beam.DoFn):
def setup(self):
self._client = language_v1.LanguageServiceClient()
def process(self, element):
classify = False
if element['lang'] =='en':
classify = True
document = {"document": {
"type_": "PLAIN_TEXT", "content": element['text']},
"features": {
"extract_entities": True,
"extract_document_sentiment": True,
"extract_entity_sentiment": True,
"classify_text": classify,
},
"encoding_type": "UTF8"
}
response = self._client.annotate_text(request=document)
new_element = element
new_element['analysis'] = {'sentences': response.sentences,
'score': response.document_sentiment.score,
'magnitude': response.document_sentiment.magnitude,
'tokens': response.tokens,
'lang_api': response.language,
'entities': response.entities
}
print(new_element)
return new_element
And, I call this class from:
data_enrich: PCollection = (
tweets_data | beam.ParDo(RequestAPI())
| 'write data' >> beam.io.WriteToText('data.txt')
)
The problem is that when I recieve the response and return the new element I get the following error: TypeError: cannot pickle 'google.protobuf.pyext._message.RepeatedCompositeContainer' object [while running 'ParDo(RequestAPI)']
I already check that the ptoblem is not the API response or the type of the new element (is a dict).
I am using python 3.7 and the apache beam version 2.43.
I hope that you can help me or give me any clue about what is happenig.
I try it before with a Map transform and it give it to me the same error, and I saw a few examples of external call with apache beam and the examples use DoFn classes, so I tried it but it didn't work. It gives me the same error.
Upvotes: 0
Views: 285
Reputation: 1166
The output of a DoFn needs to be an iterator. According to the documentation (chapter 4.2.1.2)
Your process method should accept an argument element, which is the input
element
, and return an iterable with its output values. You can accomplish this by emitting individual elements with yield statements. You can also use a return statement with an iterable, like a list or a generator.
So you should change the output of RequestAPI
to
return [new_element]
or even better
yield new_element
Upvotes: 0