Reputation: 156
What ways do we have available to connect to a Google Cloud SQL (MySQL) instance from the newly introduced Google Cloud Composer? The intention is to get data from a Cloud SQL instance into BigQuery (perhaps with an intermediary step through Cloud Storage).
Can the Cloud SQL proxy be exposed in some way on pods part the Kubernetes cluster hosting Composer?
If not can the Cloud SQL Proxy be brought in by using the Kubernetes Service Broker? -> https://cloud.google.com/kubernetes-engine/docs/concepts/add-on/service-broker
Should Airflow be used to schedule and call GCP API commands like 1) export mysql table to cloud storage 2) read mysql export into bigquery?
Perhaps there are other methods that I am missing to get this done
Upvotes: 11
Views: 9604
Reputation: 1561
Now we can connect to Cloud SQL without creating a cloud proxy ourselves. The operator will create it automatically. The code look like this:
from airflow.models import DAG
from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceExportOperator
export_body = {
'exportContext': {
'fileType': 'CSV',
'uri': EXPORT_URI,
'databases': [DB_NAME],
'csvExportOptions': {
'selectQuery': SQL
}
}
}
default_dag_args = {}
with DAG(
'postgres_test',
schedule_interval='@once',
default_args=default_dag_args) as dag:
sql_export_task = CloudSqlInstanceExportOperator(
project_id=GCP_PROJECT_ID,
body=export_body,
instance=INSTANCE_NAME,
task_id='sql_export_task'
)
Upvotes: 0
Reputation: 193
Adding the medium post in the comments from @Leo to the top level https://medium.com/@ariklevliber/connecting-to-gcp-composer-tasks-to-cloud-sql-7566350c5f53 . Once you follow that article and have the service setup you can connect from your DAG using SQLAlchemy like this:
import os
from datetime import datetime, timedelta
import logging
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
logger = logging.getLogger(os.path.basename(__file__))
INSTANCE_CONNECTION_NAME = "phil-new:us-east1:phil-db"
default_args = {
'start_date': datetime(2019, 7, 16)
}
def connect_to_cloud_sql():
'''
Create a connection to CloudSQL
:return:
'''
import sqlalchemy
try:
PROXY_DB_URL = "mysql+pymysql://<user>:<password>@<cluster_ip>:3306/<dbname>"
logger.info("DB URL", PROXY_DB_URL)
engine = sqlalchemy.create_engine(PROXY_DB_URL, echo=True)
for result in engine.execute("SELECT NOW() as now"):
logger.info(dict(result))
except Exception:
logger.exception("Unable to interact with CloudSQL")
dag = DAG(
dag_id="example_sqlalchemy",
default_args=default_args,
# schedule_interval=timedelta(minutes=5),
catchup=False # If you don't set this then the dag will run according to start date
)
t1 = PythonOperator(
task_id="example_sqlalchemy",
python_callable=connect_to_cloud_sql,
dag=dag
)
if __name__ == "__main__":
connect_to_cloud_sql()
Upvotes: 1
Reputation: 71
"The Cloud SQL Proxy provides secure access to your Cloud SQL Second Generation instances without having to whitelist IP addresses or configure SSL." -Google CloudSQL-Proxy Docs
CloudSQL Proxy seems to be the recommended way to connect to CloudSQL above all others. So in Composer, as of release 1.6.1, we can create a new Kubernetes Pod to run the gcr.io/cloudsql-docker/gce-proxy:latest image, expose it through a service, then create a Connection in Composer to use in the operator.
To get set up:
Follow Google's documentation
Test the connection using info from Arik's Medium Post
Check that the pod was created kubectl get pods --all-namespaces
Check that the service was created kubectl get services --all-namespaces
Jump into a worker node kubectl --namespace=composer-1-6-1-airflow-1-10-1-<some-uid> exec -it airflow-worker-<some-uid> bash
mysql -u composer -p --host <service-name>.default.svc.cluster.local
Notes:
Composer now uses namespaces to organize pods
Pods in different namespaces don't talk to each other unless you give them the full path <k8-service-name>.<k8-namespace-name>.svc.cluster.local
Creating a new Composer Connection with the full path will enable successful connection
Upvotes: 7
Reputation: 61
We had the same problem but with a Postgres instance. This is what we did, and got it to work:
create a sqlproxy deployment in the Kubernetes cluster where airflow runs. This was a copy of the existing airflow-sqlproxy used by the default airflow_db connection with the following changes to the deployment file:
create a kubernetes service, again as a copy of the existing airflow-sqlproxy-service with the following changes:
in the airflow UI, add a connection of type Postgres with host set to the newly created service name.
Upvotes: 4
Reputation: 750
Here, in Hoffa's answer to a similar question, you can find a reference on how Wepay keeps it synchronized every 15 minutes using an Airflow operator.
From said answer:
Take a look at how WePay does this:
The MySQL to GCS operator executes a SELECT query against a MySQL table. The SELECT pulls all data greater than (or equal to) the last high watermark. The high watermark is either the primary key of the table (if the table is append-only), or a modification timestamp column (if the table receives updates). Again, the SELECT statement also goes back a bit in time (or rows) to catch potentially dropped rows from the last query (due to the issues mentioned above).
With Airflow they manage to keep BigQuery synchronized to their MySQL database every 15 minutes.
Upvotes: 0
Reputation: 614
You can follow these instructions to launch a new Cloud SQL proxy instance in the cluster.
re #3: That sounds like a good plan. There isn't a Cloud SQL to BigQuery operator to my knowledge, so you'd have to do it in two phases like you described.
Upvotes: 2