mateus leao
mateus leao

Reputation: 1

Airflow/Composer - Python Module running all the time over and over

I was responsible for implementing Google Composer (same as airflow) in my team, and I think I did something fundamentally wrong: I have my DAGs that always call to a .python file where I have classes and functions that work with a streamsets api, with bigquery, and with postgresql. I did that so I wouldn't have to write lots of code everytime I would write a new pipeline/dag.

What I discovered checking my logs is that even if my dags are set to run only once everyday, my logs are being generated ALL THE TIME, many times per minute. And I can see the code of my module .py file being executed (but not the operators). I think that the reason is because since i have this .py file inside the airflow dags folder (actually inside another folder) it's being ran all the time, and it's wasting lots of resources (I suppose), and also generating me a problem. I have an API call that I do, and if the values are 0 my last operator of this dag is supposed to send an alert email, and when I put this dag in airflow, even before it gets to the last operator, it starts sending the email (MANY TIMES), again, I suppose this is happening because the API Request is being called, and then the email is being triggered over and over again.

I need help because I think that I did something fundamentally wrong (this module file)...

Thanks.

Some snippets of code - the dag first:

# here's where I import my module where I have all of the code
from dependencies.classes_functions import BigQuery, Streamsets_API
dag = DAG(
    '67776TTTTEST33copyindtest00email',
    default_args=default_args,
    description='activating streamsets pipeline with airflow',
    schedule_interval='0 10 * * *', # Running at 10:00 AM UTC everyday
    catchup=False,
    dagrun_timeout=timedelta(minutes=260))
#only showing the last operator
t4 = BigQueryOperator(
    task_id='finish_load_logs',
    provide_context=True,
    sql= BigQuery.update_logs_finish(
        'http://xxx/rest/v1/pipeline/dxx4/history', #maknig the api request
        ),
    use_legacy_sql=False,
    bigquery_conn_id=BQ_CONN_ID,
    allow_large_results=True, 
    dag=dag)

# Scheduling our pipeline
t2.set_upstream(t1)
t3.set_upstream(t2)
t4.set_upstream(t3)
#Order of execution of our pipeline
t1 >> t2 >> t3 >> t4

Now part of my module code (it do the request coming from the DAG, then parse the json to get some metrics, if the metrics are 0 it should send an email, then update a bigquery table - the problem is that this part of the code (and almost everything on my module file keeps running over and over again) - keep in mind I don't want to rewrite everything... I'm not so sure about what's happening under the hood, but I feel it's not good...

class BigQuery:
    #Initializing our class where we'll have different functions working with the BigQuery Operator
    def method(self):
        return 'instance method called', self

    @classmethod
    def update_logs_finish(self, api_url_hist, table_to_update, pipeline_name):
        # This function will be utilized to automate our pipelines in a daily manner
        api_url_hist= str(api_url_hist)
        get_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        result_timestamp_finished = get_timestamp + ' UTC'
        user_streamsets = Variable.get('user_streamsets') 
        pass_streamsets = Variable.get('pass_streamsets') 
        auth = (user_streamsets, pass_streamsets)
        headers = {
            'X-Requested-By': 'sdc',
            'Content-Type': 'application/json'
        }
        # If our API_URL coming from the dag contains system/info/serverTime, then we won't get historical metrics
        # This is being used only to get a heartbeat from Streamsets and to record that it was successfuly or not in BQ

        if 'http://3' in api_url_hist:
            try: 
                time.sleep(3)
                response = requests.get(api_url_hist, headers=headers, auth=auth)
                time.sleep(3)
            except requests.exceptions.ConnectionError:
                return "Connection refused"
            try:
                count_input=1
                count_output=1
                results = response.json()[0]['metrics']
                json_results = json.loads(results) # Have to parse from string to dict/json again
                json_results = json_results['counters']
                count_input = json_results['pipeline.batchInputRecords.counter']['count']
                count_output = json_results['pipeline.batchOutputRecords.counter']['count']
                count_errors = json_results['pipeline.batchErrorRecords.counter']['count']
                print("The collection of the logs worked")
                print("Count of Input Records:", count_input)
                print("Count of Output Records:", count_output)
                print("Count of Error Records:", count_errors)
            except:    
                print("streamsets_status_request call failed (response is not 200): ", response)
                return 'this task failed' 
            # If metric count_input or count_output coming from the streamsets job -
            # requested by the API is 0, then send email to data team
            try:
                if count_input == 0 or count_output==0:
                    message = Mail(
                        from_email='[email protected]',
                        to_emails='[email protected]',
                        subject=('streamsets failed to insert 0 data in {}').format(pipeline_name),
                        html_content=('<h1>Streamsets Ingested ZERO (0) DATA in {}</h1>').format(pipeline_name))
                    #try:
                    time.sleep(3)
                    sg = SendGridAPIClient(os.environ.get(
                        'SENDGRID_API_KEY'))
                    response_mail = sg.send(message)
                    print(response_mail.status_code)
                    print(response_mail.body)
                    print(response_mail.headers)
                    time.sleep(3)
            except:
                print("something failed while sending the email")
                return "something failed while sending the email"
            #except Exception as e:
                #print(e.message)

            # Update our Bigquery composer_execution_logs table, after getting our metrics above
            query_sp = """
                UPDATE `{}`
                SET timestamp_end = TIMESTAMP('{}'),
                status = 'finished',
                count_input = {},
                count_output = {},
                count_errors = {}
                WHERE pipeline_name = '{}'
                AND timestamp_start = (select max(timestamp_start) from `{}` where pipeline_name = '{}')
                """.format(table_to_update, result_timestamp_finished, count_input, count_output, count_errors, pipeline_name, table_to_update, pipeline_name)
            print("this is the query: ", query_sp)
            return query_sp # Return concatenated query to be executed in our BigQuery Operator in our DAG

Upvotes: 0

Views: 1133

Answers (1)

Philipp Johannis
Philipp Johannis

Reputation: 2956

This happens because airflow scheduler is parsing the DAG folder every few seconds to discover code changes and new DAGs, see Airflow Best Practices:

In general, you should not write any code outside the tasks. The code outside the tasks runs every time Airflow parses the DAG, which happens every second by default.

I see two options to solve your issue:

  • Option 1 - Move your dependencies folder out of the DAG folder
  • Option 2 - Create an .airflowignore file:

A .airflowignore file specifies the directories or files in DAG_FOLDER that Airflow should intentionally ignore. Each line in .airflowignore specifies a regular expression pattern, and directories or files whose names (not DAG id) match any of the patterns would be ignored (under the hood, re.findall() is used to match the pattern). Overall it works like a .gitignore file. Use the # character to indicate a comment; all characters on a line following a # will be ignored.

Upvotes: 1

Related Questions