eml
eml

Reputation: 439

.partitionBy('id') when writing to BigQuery tables in PySpark

I'm reading a table from BigQuery into Spark. The table has the columns: id(int), string1(string), string2(string), string3(string), date(date), timestamp(int). Now I want to write the rows of the resulting dataframe to separate tables in BigQuery according to their id. For example, there are 700 rows that have id 1, I want to create a table project.dataset.id1, and write all those 700 rows (all columns) to that table, then all rows that have id 2 should go to a table project.dataset.id2, and so on.

If I was writing to text files I would be using write.partitionBy('id'), what can I do when writing to BigQuery?

I've tried to solve it using python, but it's very slow and I'm wondering if there's a better way to do it.

Here's what I've tried so far:

#!/usr/bin/python

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from google.cloud import bigquery

sc = SparkContext().getOrCreate()

sc.setLogLevel("ERROR")

spark = SparkSession\
    .builder\
    .appName("example-spark")\
    .getOrCreate()

df = spark.read.format('bigquery').option('table', 'project.dataset.input_table').load()

ids = df.select("id").distinct().rdd.flatMap(lambda x: x).collect()
dfs = [df.where(df["id"] == id) for id in ids]

client = bigquery.Client()
dataset_ref = client.dataset('test2')

for df in dfs:
    id = str(df.select("id").distinct().rdd.flatMap(lambda x: x).collect()[0])
    table_name = "source_table_%s" % (id)
    table_ref = dataset_ref.table(table_name)
    schema = [
                bigquery.SchemaField('source_id', 'INT64', mode='REQUIRED'),
                bigquery.SchemaField('string1', 'STRING', mode='NULLABLE'),
                bigquery.SchemaField('string2', 'STRING', mode='NULLABLE'),
                bigquery.SchemaField('string3', 'STRING', mode='NULLABLE'),
                bigquery.SchemaField('date', 'DATE', mode='NULLABLE'),
                bigquery.SchemaField('timestamp', 'INT64', mode='NULLABLE')
            ]
    table = bigquery.Table(table_ref, schema=schema)
    table = client.create_table(table)
    print('table {} created.'.format(table.table_id))
    table_path = "project.dataset.%s" % (table_name)
    df.write.format('bigquery').option('table', table_path).mode("overwrite").save()

I'm thinking to group/partition by id and then make a new table per group named by it's id using python API for BigQuery and then using .write.format('bigquery') to write the rows to that table.

What function can I use to "split" my dataframe into portions by id and then iterate over each portion to create the table and perform the write?

Also, what datatype would I be working with after applying that function (how can I access the id for naming the table)?

Upvotes: 1

Views: 1774

Answers (1)

Michael Entin
Michael Entin

Reputation: 7764

Sorry this does not directly answer the question, but I want to suggest a different approach. Instead of creating and managing numerous tables in BigQuery, you can create a single table partitioned by id column.

Read more about integer partitioning: https://cloud.google.com/bigquery/docs/creating-integer-range-partitions

Once you've set it up, you can just write to this single table, and BigQuery will do partitioning for you. When you read, reading with a filter FROM Table WHERE id = 12345 would then replace reading FROM TableId12345. This is usually more manageable and easier to implement.

Upvotes: 1

Related Questions