Reputation: 1456
I'm currently trying to run a Dataflow (Apache Beam, Python SDK) task to import a >100GB Tweet file into BigQuery, but running into Error: Message: Too many sources provided: 15285. Limit is 10000.
The task takes the tweets (JSON), extracts 5 relevant fields, transforms/sanitizes them a bit with some transforms and then write those values into BigQuery, which will be used for further processing.
There's Cloud Dataflow to BigQuery - too many sources but it seems to be caused by having a lot of different input files, whereas I have a single input file, so it doesn't seem relevant. Also the solutions mentioned there are rather cryptic and I'm not sure if/how I could apply them to my problem.
My guess is that BigQuery writes temporary files for each row or something before persisting them, and that's what's meant by "too many sources" ?
How can I fix this?
[Edit]
Code:
import argparse
import json
import logging
import apache_beam as beam
class JsonCoder(object):
"""A JSON coder interpreting each line as a JSON string."""
def encode(self, x):
return json.dumps(x)
def decode(self, x):
return json.loads(x)
def filter_by_nonempty_county(record):
if 'county_fips' in record and record['county_fips'] is not None:
yield record
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
default='...',
help=('Input twitter json file specified as: '
'gs://path/to/tweets.json'))
parser.add_argument(
'--output',
required=True,
help=
('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
p = beam.Pipeline(argv=pipeline_args)
# read text file
#Read all tweets from given source file
read_tweets = "Read Tweet File" >> beam.io.ReadFromText(known_args.input, coder=JsonCoder())
#Extract the relevant fields of the source file
extract_fields = "Project relevant fields" >> beam.Map(lambda row: {'text': row['text'],
'user_id': row['user']['id'],
'location': row['user']['location'] if 'location' in row['user'] else None,
'geo':row['geo'] if 'geo' in row else None,
'tweet_id': row['id'],
'time': row['created_at']})
#check what type of geo-location the user has
has_geo_location_or_not = "partition by has geo or not" >> beam.Partition(lambda element, partitions: 0 if element['geo'] is None else 1, 2)
check_county_not_empty = lambda element, partitions: 1 if 'county_fips' in element and element['county_fips'] is not None else 0
#tweet has coordinates partition or not
coordinate_partition = (p
| read_tweets
| extract_fields
| beam.ParDo(TimeConversion())
| has_geo_location_or_not)
#lookup by coordinates
geo_lookup = (coordinate_partition[1] | "geo coordinates mapping" >> beam.ParDo(BeamGeoLocator())
| "filter successful geo coords" >> beam.Partition(check_county_not_empty, 2))
#lookup by profile
profile_lookup = ((coordinate_partition[0], geo_lookup[0])
| "join streams" >> beam.Flatten()
| "Lookup from profile location" >> beam.ParDo(ComputeLocationFromProfile())
)
bigquery_output = "write output to BigQuery" >> beam.io.Write(
beam.io.BigQuerySink(known_args.output,
schema='text:STRING, user_id:INTEGER, county_fips:STRING, tweet_id:INTEGER, time:TIMESTAMP, county_source:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
#file_output = "write output" >> beam.io.WriteToText(known_args.output, coder=JsonCoder())
output = ((profile_lookup, geo_lookup[1]) | "merge streams" >> beam.Flatten()
| "Filter entries without location" >> beam.FlatMap(filter_by_nonempty_county)
| "project relevant fields" >> beam.Map(lambda row: {'text': row['text'],
'user_id': row['user_id'],
'county_fips': row['county_fips'],
'tweet_id': row['tweet_id'],
'time': row['time'],
'county_source': row['county_source']})
| bigquery_output)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
run()
It's a little bit complicated, so it would probably take too much time to do it in bigquery directly. The code reads the tweets json, splits the PCollection by whether it's geotagged or not, if not it tries to look it up via profile location, maps to location to what's relevant for our GIS analysis and then writes it to BigQuery.
Upvotes: 3
Views: 1556
Reputation: 6130
The number of files correspond to the number of shards the elements were processed in.
One trick to reducing this is to generate some random keys, and group the elements based on that before writing them out.
For example, you could use the following DoFn
and PTransform
in your pipeline:
class _RoundRobinKeyFn(beam.DoFn):
def __init__(self, count):
self.count = count
def start_bundle(self):
self.counter = random.randint(0, self.count - 1)
def process(self, element):
self.counter += 1
if self.counter >= self.count:
self.counter -= self.count
yield self.counter, element
class LimitBundles(beam.PTransform):
def __init__(self, count):
self.count = count
def expand(self, input):
return input
| beam.ParDo(_RoundRobinKeyFn(self.count))
| beam.GroupByKey()
| beam.FlatMap(lambda kv: kv[1])
You would just use this before the bigquery_output:
output = (# ...
| LimitBundles(10000)
| bigquery_output)
(Note that I just typed this in without testing it, so there are likely some Python typos.)
Upvotes: 5