Alexandru
Alexandru

Reputation: 21

Passing xcom value to JiraOperator in Airflow

def f_get_value_via_xcom(**context):
    var = context['ti'].xcom_pull(dag_id='bash_execute', key='return_value')  
    
bash_execute = BashOperator(
    task_id='bash_execute',
    bash_command='some bash command'
    xcom_push=True,
    priority_weight=100,
    start_date = datetime(2021, 1, 21))
    
def setUp(self):
    configuration.load_test_config()
    args = {
         'owner': 'airflow',
         'start_date': DEFAULT_DATE
    }
dag = DAG(dag_id='test_jira_operator',
          default_args=default_args,
          schedule_interval='15 9 * * *',
          dagrun_timeout=timedelta(seconds=120))
db.merge_conn(
    Connection(
        conn_id='jira username', conn_type='jira',
        host='jira server', port=443,
        extra='{"verify": "False", "project": "AIRFLOW"}'))          
        
        
jira_ticket_search_operator = JiraOperator(task_id='test_jira_operator',
               jira_conn_id='airflow_jira',
                                           jira_method="create_issue",
                                           python_callable=f_get_value_via_xcom,
                                           provide_context=True,
                                           jira_method_args={
                                                   'project': {'id': 10000},
                                                   'summary': 'New test issue',
                                                   'description': "{{ ti.xcom_pull('php_thelia') }}",
                                                   'issuetype': {'name': 'Bug'},
                                           },
                                           dag=dag)         
                                                  
                                                   
bash_execute >> jira_ticket_search_operator

I'm trying to make a DAG where it creates a new Jira task with the description being the console output from a bash command but i'm getting the following error

*[2021-01-22 12:57:28,109] {taskinstance.py:1152} ERROR - Object of type 'Issue' is not JSON serializable*
                 

and i don't understand why. I have posted above my current airflow script.Could you please help me .

Upvotes: 2

Views: 4155

Answers (1)

kaxil
kaxil

Reputation: 18824

This is because from Airflow 2.0, the default value for [core] enable_xcom_pickling has been changed to False for security reasons.

The pickle type for XCom messages has been replaced to JSON by default to prevent RCE attacks. Note that JSON serialization is stricter than pickling, so for example if you want to pass raw bytes through XCom you must encode them using an encoding like base64. If you understand the risk and still want to use pickling, set enable_xcom_pickling = True in your Airflow config's core section.

Like it is mentioned in the docs, if you are happy with the risk and know that no one is going to infiltrate your environment, you can set enable_xcom_pickling = True to use Pickling to store XComs instead of JSON. And then you won't get that error. JSON serialization does not support arbitrary objects -- which is why Object of type Issue is not supported.

https://github.com/apache/airflow/blob/master/UPDATING.md#the-default-value-for-core-enable_xcom_pickling-has-been-changed-to-false

Upvotes: 4

Related Questions