Reputation: 179
I'm trying to learn my way through Cloud Dataflow. For the purpose of learning, I've broken down their basic Word Count example to a simple strip function. I want to create a PCollection of file names that are GCS objects. I get the message saying that the function ReadFromText()
is not iterable.
The way I understand PCollections is that it is a list of objects that are to be worked upon. I could write a loop that throws in each object one by one to be processed but that's not what I want to do. I want to keep that part dynamic and let Apache Beam handle the rest. I only want to give a list of files in GCS.
So far, I have been successful at processing single element PCollections'. I also do not want to do something like 'gs://dataflow-samples/shakespeare/*'
.
I've also looked at the gcsIO module and ReadAllFromText(). They also says that the function is not iterable. Please guide.
Here's what I've done until now:
"""A word-counting workflow."""
from __future__ import absolute_import
import argparse
import logging
import re
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp import gcsio
class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def __init__(self):
super(WordExtractingDoFn, self).__init__()
def process(self, element):
text_line = element.strip()
return text_line
def run(argv=None):
"""Main entry point; defines and runs the wordcount pipeline."""
p = beam.Pipeline(options=PipelineOptions())
# Read the text file[pattern] into a PCollection.
elements = ['gs://dataflow-samples/shakespeare/1kinghenryiv.txt',
'gs://dataflow-samples/shakespeare/1kinghenryvi.txt',
'gs://dataflow-samples/shakespeare/2kinghenryiv.txt',
'gs://dataflow-samples/shakespeare/2kinghenryvi.txt',
'gs://dataflow-samples/shakespeare/3kinghenryvi.txt',
'gs://dataflow-samples/shakespeare/allswellthatendswell.txt',
'gs://dataflow-samples/shakespeare/antonyandcleopatra.txt',
'gs://dataflow-samples/shakespeare/asyoulikeit.txt',
'gs://dataflow-samples/shakespeare/comedyoferrors.txt',
'gs://dataflow-samples/shakespeare/coriolanus.txt',
'gs://dataflow-samples/shakespeare/cymbeline.txt',
'gs://dataflow-samples/shakespeare/hamlet.txt',
'gs://dataflow-samples/shakespeare/juliuscaesar.txt',
'gs://dataflow-samples/shakespeare/kinghenryv.txt',
'gs://dataflow-samples/shakespeare/kinghenryviii.txt',
'gs://dataflow-samples/shakespeare/kingjohn.txt',
'gs://dataflow-samples/shakespeare/kinglear.txt',
'gs://dataflow-samples/shakespeare/kingrichardii.txt',
'gs://dataflow-samples/shakespeare/kingrichardiii.txt',
'gs://dataflow-samples/shakespeare/loverscomplaint.txt',
'gs://dataflow-samples/shakespeare/loveslabourslost.txt',
'gs://dataflow-samples/shakespeare/macbeth.txt',
'gs://dataflow-samples/shakespeare/measureforemeasure.txt',
'gs://dataflow-samples/shakespeare/merchantofvenice.txt',
'gs://dataflow-samples/shakespeare/merrywivesofwindsor.txt',
'gs://dataflow-samples/shakespeare/midsummersnightsdream.txt',
'gs://dataflow-samples/shakespeare/muchadoaboutnothing.txt',
'gs://dataflow-samples/shakespeare/othello.txt',
'gs://dataflow-samples/shakespeare/periclesprinceoftyre.txt',
'gs://dataflow-samples/shakespeare/rapeoflucrece.txt',
'gs://dataflow-samples/shakespeare/romeoandjuliet.txt',
'gs://dataflow-samples/shakespeare/sonnets.txt',
'gs://dataflow-samples/shakespeare/tamingoftheshrew.txt',
'gs://dataflow-samples/shakespeare/tempest.txt',
'gs://dataflow-samples/shakespeare/timonofathens.txt',
'gs://dataflow-samples/shakespeare/titusandronicus.txt',
'gs://dataflow-samples/shakespeare/troilusandcressida.txt',
'gs://dataflow-samples/shakespeare/twelfthnight.txt',
'gs://dataflow-samples/shakespeare/twogentlemenofverona.txt',
'gs://dataflow-samples/shakespeare/various.txt',
'gs://dataflow-samples/shakespeare/venusandadonis.txt',
'gs://dataflow-samples/shakespeare/winterstale.txt']
books = p | beam.Create((elements))
#print (books)
lines = p | 'read' >> ReadFromText(books)
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode)))
output = counts | 'write' >> WriteToText('gs://ihopeitworks/Users/see.txt',shard_name_template='')
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Upvotes: 1
Views: 2769
Reputation: 1525
You were pretty close. Try the below i.e. rather than passing the books as parameter of ReadFromText, use ReadAllFromText to read from the books PCollection by pipelining it. Hope that helps.
books = p | beam.Create((elements))
lines = books | 'read' >> ReadAllFromText()
Upvotes: 3