Reputation: 682
I am new to airflow, I want to schedule a job where the two tables from the different database records count will have to check whether it's matched or not. One source is GCP another one is Salesforce.
So I got found BigQueryOperator
to hit the query in GCP side and return the Count result but I couldn't find any operator which looks like SalesforceQueryOperator
which I can assign in an Airflow task.
So basically, I was talking about this which we can use to bring the count result:
t1 = BigQueryOperator(
task_id='bigquery_test',
bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
destination_dataset_table=False,
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='bigquery_default',
delegate_to=False,
udf_config=False,
dag=dag,
)
I know that we can create a function, import library, create connection to Salesforce and run the query to bring the Count result but I don't want to follow this approach given below(a part of the code) which I already have tried.
def salesforcequery_count():
from simple_salesforce import Salesforce
import requests
session = requests.Session()
# manipulate the session instance (optional)
sf = Salesforce(
username='[email protected]', password='password', organizationId='OrgId',
session=session)
count_record = sf.query("SELECT count(id) FROM Contact")
// for row in data:
// process(row)
return 'count_record'
I want to create a custom operator that will look like SalesforceQueryOperator
and should work like BigQueryOperator
to hit the query in Salesforce table and bring the result.
Here is the reference: https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html
Any help will be really appreciated.
Upvotes: 2
Views: 1429
Reputation: 32660
You can use existing SalesforceHook to create your own custom operator.
Here is an example :
from airflow.contrib.hooks.salesforce_hook import SalesforceHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class SalesforceQueryOperator(BaseOperator):
"""
Make a query against Salesforce
Return result as dict.
"""
template_fields = ("query",)
@apply_defaults
def __init__(self,
conn_id,
query=None,
*args,
**kwargs
):
super(SalesforceQueryOperator, self).__init__(*args, **kwargs)
self.conn_id = conn_id
self.query = query
def execute(self, context):
sf_hook = SalesforceHook(conn_id=self.conn_id)
results = sf_hook.make_query(self.query)
return results
Then using it in your DAG :
t2 = SalesforceQueryOperator(
task_id='salesforce_test',
query='SELECT count(id) FROM Contact',
conn_id='salesforce_default',
dag=dag,
)
Where salesforce_default
is a connection that you add in AirFlow. You can see here how to add it: Salesforce Connection
Upvotes: 1