Bhaskar Das
Bhaskar Das

Reputation: 682

Custom airflow operator to run query in Salesforce table

I am new to airflow, I want to schedule a job where the two tables from the different database records count will have to check whether it's matched or not. One source is GCP another one is Salesforce.

So I got found BigQueryOperator to hit the query in GCP side and return the Count result but I couldn't find any operator which looks like SalesforceQueryOperator which I can assign in an Airflow task.

So basically, I was talking about this which we can use to bring the count result:

t1 = BigQueryOperator(
        task_id='bigquery_test',
        bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
        destination_dataset_table=False,
        bigquery_conn_id='bigquery_default',             
        google_cloud_storage_conn_id='bigquery_default',
        delegate_to=False,
        udf_config=False,
        dag=dag,
 )

I know that we can create a function, import library, create connection to Salesforce and run the query to bring the Count result but I don't want to follow this approach given below(a part of the code) which I already have tried.

def salesforcequery_count():
from simple_salesforce import Salesforce
import requests

session = requests.Session()
# manipulate the session instance (optional)
sf = Salesforce(
   username='[email protected]', password='password', organizationId='OrgId',
   session=session)
   count_record = sf.query("SELECT count(id) FROM Contact")
//   for row in data:
//   process(row)
    return 'count_record'

I want to create a custom operator that will look like SalesforceQueryOperator and should work like BigQueryOperator to hit the query in Salesforce table and bring the result.

Here is the reference: https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html

Any help will be really appreciated.

Upvotes: 2

Views: 1429

Answers (1)

blackbishop
blackbishop

Reputation: 32660

You can use existing SalesforceHook to create your own custom operator.

Here is an example :

from airflow.contrib.hooks.salesforce_hook import SalesforceHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class SalesforceQueryOperator(BaseOperator):
    """
    Make a query against Salesforce
    Return result as dict.
    """
    template_fields = ("query",)

    @apply_defaults
    def __init__(self,
                 conn_id,
                 query=None,
                 *args,
                 **kwargs
                 ):
        super(SalesforceQueryOperator, self).__init__(*args, **kwargs)

        self.conn_id = conn_id
        self.query = query

    def execute(self, context):
        sf_hook = SalesforceHook(conn_id=self.conn_id)

        results = sf_hook.make_query(self.query)

        return results

Then using it in your DAG :

t2 = SalesforceQueryOperator(
        task_id='salesforce_test',
        query='SELECT count(id) FROM Contact',
        conn_id='salesforce_default',             
        dag=dag,
 )

Where salesforce_default is a connection that you add in AirFlow. You can see here how to add it: Salesforce Connection

Upvotes: 1

Related Questions