Adt
Adt

Reputation: 333

Airflow Exception: DataFlow failed with return code 1

I am trying to execute dataflow jar through airflow script. For it i am using DataFlowJavaOperator. In the param jar,i am passing the path of the executable jar file present in the local system.But when i try to run this job i get error as

{gcp_dataflow_hook.py:108} INFO - Start waiting for DataFlow process to complete.
[2017-09-12 16:59:38,225] {models.py:1417} ERROR - DataFlow failed with return code 1
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
    result = task_copy.execute(context=context)
  File "/usr/lib/python2.7/site-packages/airflow/contrib/operators/dataflow_operator.py", line 116, in execute
    hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
  File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 146, in start_java_dataflow
    task_id, variables, dataflow, name, ["java", "-jar"])
  File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 138, in _start_dataflow
    _Dataflow(cmd).wait_for_done()
  File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 119, in wait_for_done
    self._proc.returncode))
Exception: DataFlow failed with return code 1`

My airflow script is :

from airflow.contrib.operators.dataflow_operator import DataFlowJavaOperator
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datetime import datetime, timedelta


default_args = {
'owner': 'airflow',
'start_date': datetime(2017, 03, 16),
'email': [<EmailID>],

'dataflow_default_options': {
        'project': '<ProjectId>',
       # 'zone': 'europe-west1-d', (i am not sure what should i pass here)
        'stagingLocation': 'gs://spark_3/staging/'
    }
 }

 dag = DAG('Dataflow',schedule_interval=timedelta(minutes=2), 
 default_args=default_args)

 dataflow1 = DataFlowJavaOperator(
 task_id='dataflow_example',
 jar ='/root/airflow_scripts/csvwriter.jar',
 gcp_conn_id  = 'GCP_smoke', 
 dag=dag)

I am not sure what mistake i am making ,Can anybody please help me to get out of this

Note :I am creating this jar while selecting option as Runnable JAR file by packaging all the external dependencies.

Upvotes: 2

Views: 3140

Answers (1)

Adt
Adt

Reputation: 333

The problem was with the jar that I was using. Before using the jar, Make sure that the jar is executing as expected.

Example: If your jar was dataflow_job1.jar, Execute the jar using

java -jar dataflow_job_1.jar --parameters_if_any

Once your jar runs successfully, Proceed with using the jar in Airflow DataflowJavaOperator jar.

Furthermore, If you encounter errors related to Coders, you may have to make your own coder to execute the code. For instance, I had a problem with TableRow class as it didnot have a default coder and thus i had to make this up:

TableRowCoder :

public class TableRowCoder extends Coder<TableRow> {
private static final long serialVersionUID = 1L;
private static final Coder<TableRow> tableRow = TableRowJsonCoder.of();
@Override
public void encode(TableRow value, OutputStream outStream) throws CoderException, IOException {
    tableRow.encode(value, outStream);

}
@Override
public TableRow decode(InputStream inStream) throws CoderException, IOException {
    return new TableRow().set("F1", tableRow.decode(inStream));
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
    // TODO Auto-generated method stub
    return null;
}
@Override
public void verifyDeterministic() throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {


}
}

Then Register this coder in your code using

pipeline.getCoderRegistry().registerCoderForClass(TableRow.class, new TableRowCoder())

If there are still errors(which are not related to coders) Navigate to:

*.jar\META-INF\services\FileSystemRegistrar 

and add any dependencies that may occur.

For example there might be a staging error as:

Unable to find registrar for gs

i had to add the following line to make it work.

org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystemRegistrar

Upvotes: 1

Related Questions