Reputation: 439
I'm reading a table from BigQuery into Spark. The table has the columns: id(int), string1(string), string2(string), string3(string), date(date), timestamp(int). Now I want to write the rows of the resulting dataframe to separate tables in BigQuery according to their id. For example, there are 700 rows that have id 1, I want to create a table project.dataset.id1, and write all those 700 rows (all columns) to that table, then all rows that have id 2 should go to a table project.dataset.id2, and so on.
If I was writing to text files I would be using write.partitionBy('id'), what can I do when writing to BigQuery?
I've tried to solve it using python, but it's very slow and I'm wondering if there's a better way to do it.
Here's what I've tried so far:
#!/usr/bin/python
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from google.cloud import bigquery
sc = SparkContext().getOrCreate()
sc.setLogLevel("ERROR")
spark = SparkSession\
.builder\
.appName("example-spark")\
.getOrCreate()
df = spark.read.format('bigquery').option('table', 'project.dataset.input_table').load()
ids = df.select("id").distinct().rdd.flatMap(lambda x: x).collect()
dfs = [df.where(df["id"] == id) for id in ids]
client = bigquery.Client()
dataset_ref = client.dataset('test2')
for df in dfs:
id = str(df.select("id").distinct().rdd.flatMap(lambda x: x).collect()[0])
table_name = "source_table_%s" % (id)
table_ref = dataset_ref.table(table_name)
schema = [
bigquery.SchemaField('source_id', 'INT64', mode='REQUIRED'),
bigquery.SchemaField('string1', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('string2', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('string3', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('date', 'DATE', mode='NULLABLE'),
bigquery.SchemaField('timestamp', 'INT64', mode='NULLABLE')
]
table = bigquery.Table(table_ref, schema=schema)
table = client.create_table(table)
print('table {} created.'.format(table.table_id))
table_path = "project.dataset.%s" % (table_name)
df.write.format('bigquery').option('table', table_path).mode("overwrite").save()
I'm thinking to group/partition by id and then make a new table per group named by it's id using python API for BigQuery and then using .write.format('bigquery') to write the rows to that table.
What function can I use to "split" my dataframe into portions by id and then iterate over each portion to create the table and perform the write?
Also, what datatype would I be working with after applying that function (how can I access the id for naming the table)?
Upvotes: 1
Views: 1774
Reputation: 7764
Sorry this does not directly answer the question, but I want to suggest a different approach. Instead of creating and managing numerous tables in BigQuery, you can create a single table partitioned by id column.
Read more about integer partitioning: https://cloud.google.com/bigquery/docs/creating-integer-range-partitions
Once you've set it up, you can just write to this single table, and BigQuery will do partitioning for you. When you read, reading with a filter FROM Table WHERE id = 12345
would then replace reading FROM TableId12345
. This is usually more manageable and easier to implement.
Upvotes: 1