Reputation: 158
I am trying to build a simple pipeline processing a json. My Pipeline looks like this
input_files = 'message.json'
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
# reading json and fetching its contents
lines = (
p | 'Read JSON' >> beam.io.ReadFromText(input_files)
| 'Fetch Data and File Name' >> beam.ParDo(FetchFileName('start'))
)
sdata = beam.pvalue.AsSingleton(lines['sdata'])
transform = (
lines
| 'Parallel Transform of Data' >> beam.ParDo(AdjustObject(), service_id=sdata)
| 'PRINT' >> beam.Map(print)
)
And the FetchFileName looks like this
class FetchFileName(beam.DoFn):
def process(self,element):
element = json.loads(element)
gdata = element['data']
for elm in gdata:
yield elm
yield beam.pvalue.TaggedOutput('sdata', element['id'])
I am trying to pass the 'id = 999' value as a variable called 'sdata' to the next pipeline step called 'Parallel Transform of Data' but keep getting the following error
TypeError: 'PCollection' object is not subscriptable
My Json file looks like below
{"id":999,"data":[{"Emp_Id":495},{"Emp_Id":494}]}
Upvotes: 0
Views: 1822
Reputation: 6023
According to the code you have shared, the step ParDo(FetchFileName('start'))
should produce two output PCollections:
element['gdata']
across all the JSON values in all the files.sdata
that contains the collection of element['id']
across all the JSON values in all the files.To produce these two PCollections you need to modify your ParDo
with the .with_outputs
method, like this:
results = (
p | 'Read JSON' >> beam.io.ReadFromText(input_files)
| 'Fetch Data and File Name' >> beam.ParDo(FetchFileName('start').with_outputs(
'sdata',
main='gdata'))
So if the input files contained
{
"id": 1,
"gdata": ["a", "b", "c"]
}
{
"id": 2,
"gdata": ["q"]
}
{
"id": 3,
"gdata": ["y", "z"]
}
Then the output PCollections would be:
results['sdata']
would contain 1
, 2
, 3
results['gdata']
would contain "a"
, "b"
, "c"
, "q"
, "x"
, "y"
See the full documentation at https://beam.apache.org/documentation/programming-guide/#additional-outputs.
Now I see another problem: the next line would probably not work as expected. Since the collection results['sdata']
contains many values, you cannot apply AsSingleton
. I do not know your full desired result, but I hope this helps to explain the first part to get you further toward your goal.
Upvotes: 1