Reputation: 143
I have an issue where the BashOperator is not logging all of the output from wget. It'll log only the first 1-5 lines of the output.
I have tried this with only wget as the bash command:
tester = BashOperator(
task_id = 'testing',
bash_command = "wget -N -r -nd --directory-prefix='/tmp/' http://apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip",
dag = dag)
I've also tried this as part of a longer bash script that has other commands that follow wget. Airflow does wait for the script to complete before firing downstream tasks. Here's an example bash script:
#!/bin/bash
echo "Starting up..."
wget -N -r -nd --directory-prefix='/tmp/' http://apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip
echo "Download complete..."
unzip /tmp/httpcomponents-client-4.5.3-src.zip -o -d /tmp/test_airflow
echo "Archive unzipped..."
Last few lines of a log file:
[2017-04-13 18:33:34,214] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2017-04-13 18:33:34,214] {base_task_runner.py:95} INFO - Subtask: Starting attempt 1 of 1
[2017-04-13 18:33:34,215] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2017-04-13 18:33:34,215] {base_task_runner.py:95} INFO - Subtask:
[2017-04-13 18:33:35,068] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:35,068] {models.py:1342} INFO - Executing <Task(BashOperator): testing> on 2017-04-13 18:33:08
[2017-04-13 18:33:37,569] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:37,569] {bash_operator.py:71} INFO - tmp dir root location:
[2017-04-13 18:33:37,569] {base_task_runner.py:95} INFO - Subtask: /tmp
[2017-04-13 18:33:37,571] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:37,571] {bash_operator.py:81} INFO - Temporary script location :/tmp/airflowtmpqZhPjB//tmp/airflowtmpqZhPjB/testingCkJgDE
[2017-04-13 18:14:54,943] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,942] {bash_operator.py:82} INFO - Running command: /var/www/upstream/xtractor/scripts/Temp_test.sh
[2017-04-13 18:14:54,951] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,950] {bash_operator.py:91} INFO - Output:
[2017-04-13 18:14:54,955] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,954] {bash_operator.py:96} INFO - Starting up...
[2017-04-13 18:14:54,958] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,957] {bash_operator.py:96} INFO - --2017-04-13 18:14:54-- http://apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip
[2017-04-13 18:14:55,106] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,105] {bash_operator.py:96} INFO - Resolving apache.cs.utah.edu (apache.cs.utah.edu)... 155.98.64.87
[2017-04-13 18:14:55,186] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,186] {bash_operator.py:96} INFO - Connecting to apache.cs.utah.edu (apache.cs.utah.edu)|155.98.64.87|:80... connected.
[2017-04-13 18:14:55,284] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,284] {bash_operator.py:96} INFO - HTTP request sent, awaiting response... 200 OK
[2017-04-13 18:14:55,285] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,284] {bash_operator.py:96} INFO - Length: 1662639 (1.6M) [application/zip]
[2017-04-13 18:15:01,485] {jobs.py:2083} INFO - Task exited with return code 0
Edit: More testing suggests that it's a problem logging the output of wget.
Upvotes: 4
Views: 8872
Reputation: 143
This isn't a complete answer, but it's a big step forward. The problem seems to be an issue with Python's logging function and the output wget
produces. Turns out the airflow scheduler was throwing an error: UnicodeEncodeError: 'ascii' codec can't encode character u'\u2018' in position....
I modified bash_operator.py in the airflow code base so that the bash output is encoded (on line 95):
loging.info(line.encode('utf-8'))
An error is still happening but at least it is appearing in the log file along with the output of the rest of the bash script. The error that appears in the log file now is: UnicodeDecodeError: 'ascii' codec can't decode byte 0xe2 in position...
Even though there is still a Python error happening, output is getting logged, so I'm satisfied for now. If someone has an idea of how to resolve this issue in a better manner, I'm open to ideas.
Upvotes: 0
Reputation: 3081
Its because in the default operator only last line is printed. Please replace the code with the following inside airflow/operators/bash_operator.py
where ever your airflow is installed. Usually, you need to look in where your python is and then go to site-packages
from builtins import bytes
import os
import signal
import logging
from subprocess import Popen, STDOUT, PIPE
from tempfile import gettempdir, NamedTemporaryFile
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.file import TemporaryDirectory
class BashOperator(BaseOperator):
"""
Execute a Bash script, command or set of commands.
:param bash_command: The command, set of commands or reference to a
bash script (must be '.sh') to be executed.
:type bash_command: string
:param xcom_push: If xcom_push is True, the last line written to stdout
will also be pushed to an XCom when the bash command completes.
:type xcom_push: bool
:param env: If env is not None, it must be a mapping that defines the
environment variables for the new process; these are used instead
of inheriting the current process environment, which is the default
behavior. (templated)
:type env: dict
:type output_encoding: output encoding of bash command
"""
template_fields = ('bash_command', 'env')
template_ext = ('.sh', '.bash',)
ui_color = '#f0ede4'
@apply_defaults
def __init__(
self,
bash_command,
xcom_push=False,
env=None,
output_encoding='utf-8',
*args, **kwargs):
super(BashOperator, self).__init__(*args, **kwargs)
self.bash_command = bash_command
self.env = env
self.xcom_push_flag = xcom_push
self.output_encoding = output_encoding
def execute(self, context):
"""
Execute the bash command in a temporary directory
which will be cleaned afterwards
"""
bash_command = self.bash_command
logging.info("tmp dir root location: \n" + gettempdir())
line_buffer = []
with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
f.write(bytes(bash_command, 'utf_8'))
f.flush()
fname = f.name
script_location = tmp_dir + "/" + fname
logging.info("Temporary script "
"location :{0}".format(script_location))
logging.info("Running command: " + bash_command)
sp = Popen(
['bash', fname],
stdout=PIPE, stderr=STDOUT,
cwd=tmp_dir, env=self.env,
preexec_fn=os.setsid)
self.sp = sp
logging.info("Output:")
line = ''
for line in iter(sp.stdout.readline, b''):
line = line.decode(self.output_encoding).strip()
line_buffer.append(line)
logging.info(line)
sp.wait()
logging.info("Command exited with "
"return code {0}".format(sp.returncode))
if sp.returncode:
raise AirflowException("Bash command failed")
logging.info("\n".join(line_buffer))
if self.xcom_push_flag:
return "\n".join(line_buffer)
def on_kill(self):
logging.info('Sending SIGTERM signal to bash process group')
os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
Upvotes: 2