Reputation:
Is there any guidance available to use Google Cloud SQL as a Dataflow read source and/or sink?
At the Apache Beam Python SDK 2.1.0 documentation there isn't a chapter mentioning Google Cloud SQL. But there is written about BigQuery.
And as I read tutorial Performing ETL from a Relational Database into BigQuery, I saw that they used exported data to file as a source in the process. That means there has to be an export step in between and that't not ideal.
Are there specific issues you need to take care of when using Cloud SQL in specific? For both source as sink?
Upvotes: 5
Views: 4752
Reputation: 2561
there is one good library https://github.com/pysql-beam/pysql-beam for SQL ingestion, please go through the examples, it supports RDBMS like MySQL and Postgresql.
It has provided read and write the options like below we can read the data from google cloud SQL:
from pysql_beam.sql_io.sql import ReadFromSQL
....
ReadFromSQL(host=options.host, port=options.port,
username=options.username, password=options.password,
database=options.database,
query=options.source_query,
wrapper=PostgresWrapper,
batch=100000)
Upvotes: 0
Reputation: 11041
The Beam Python SDK does not have a built-in transform to read data from a MySQL/Postgres database. Nonetheless, it should not be too troublesome to write a custom transform to do this. You can do something like this:
with beam.Pipeline() as p:
query_result_pc = (p
| beam.Create(['select a,b,c from table1'])
| beam.ParDo(QueryMySqlFn(host='...', user='...'))
| beam.Reshuffle())
To connect to MySQL, we'll use the mysql-specific library mysql.connector, but you can use the appropriate library for Postgres/etc.
Your querying function is:
import mysql.connector
class QueryMySqlFn(beam.DoFn):
def __init__(self, **server_configuration):
self.config = server_configuration
def start_bundle(self):
self.mydb = mysql.connector.connect(**self.config)
self.cursor = mydb.cursor()
def process(self, query):
self.cursor.execute(query)
for result in self.cursor:
yield result
For Postgres, you would use psycopg2
or any other library that allows you to connect to it:
import psycopg2
class QueryPostgresFn(beam.DoFn):
def __init__(self, **server_config):
self.config = server_config
def process(self, query):
con = psycopg2.connect(**self.config)
cur = con.cursor()
cur.execute(query)
return cur.fetchall()
FAQ
beam.Reshuffle
transform there? - Because the QueryMySqlFn
does not parallelize reading data from the database. The reshuffle will ensure that our data is parallelized downstream for further processing.Upvotes: 4