Reputation: 175
I am running Airflowv1.10.15 on Cloud Composer v1.16.16.
My DAG looks like this :
from datetime import datetime, timedelta
# imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from scripts import workday_extract, workday_config_large
default_args = {
'owner': 'xxxx',
'depends_on_past': False,
'start_date': datetime(2021, 9, 14),
'email_on_failure': True,
'email': ['xxxx'],
'retries': 1,
'retry_delay': timedelta(minutes=2),
'catchup': False
}
# Define the DAG with parameters
dag = DAG(
dag_id='xxxx_v1',
schedule_interval='0 20 * * *',
default_args=default_args,
catchup=False,
max_active_runs=1,
concurrency=1
)
def wd_to_bq(key, val, **kwargs):
logger.info("workday to BQ ingestion")
workday_extract.fetch_wd_load_bq(key, val)
start_load = DummyOperator(task_id='start', dag=dag)
end_load = DummyOperator(task_id='end', dag=dag)
for key, val in workday_config_large.endpoint_tbl_mapping.items():
# Task 1: Process the unmatched records from the view
workday_to_bq = PythonOperator(
dag=dag,
task_id=f'{key}',
execution_timeout=timedelta(minutes=60),
provide_context=True,
python_callable=wd_to_bq,
op_kwargs={'key': key, 'val': val}
)
start_load >> workday_to_bq >> end_load
The task fails with error - Task exited with return code Negsignal.SIGKILL . The python script runs fine on my local machine and completes in 15 minutes. There are multiple endpoints from which the reports are extracted. However, the one that takes longest ( ~15 minutes) fails with this error and others succeed.
I have tried a lot of options but none seem to work. Can someone help on this ?
Upvotes: 16
Views: 53845
Reputation: 1
I ran airflow in kubernetes, allocated a separate server with 25GB of RAM for the worker and there were no resource restrictions After launching, DAG crashed after a few minutes, at which time the airflow worker took all available memory The problem was in the large amount of data (database table, 4 GB, 17 million rows) that he was trying to work with. After deleting half of the data manually from the database, DAG worked successfully
Upvotes: 0
Reputation: 8814
A message like below in your airflow task logs suggests that the kernel/OS killed your process. SIGKILL(signal 9) is a directive to kill the process immediately.
{{local_task_job.py:102}} INFO - Task exited with return code Negsignal.SIGKILL
It is very likely that the task you are performing(in this case the function - workday_to_bq) was utilizing a lot of resources on the worker container. I'm assuming that you are ingesting and processing some data which can be memory intensive.
You've mentioned that its working locally but failing in airflow cloud. This could be because either you have a lot of RAM on local system OR your cloud composer airflow workers are processing other DAG's that are hogging the worker memory. To confirm that this is a memory issue you can check the dashboard provided by the cloud service.
Airflow runs its tasks on workers, hence you will have to upgrade the workers with better hardware. Try increasing the RAM.
Note that the purpose of airflow is to schedule ETL tasks and orchestrate the pipeline. You shouldn't be loading high volumes of data into the airflow workers and utilize all of its cpu/memory. This will slow down your entire airflow environment or SIGKILL your DAGS randomly. In most cases only the DAG/process that is using too much memory will be killed by the OOM killer, but sometimes it can kill other DAGS/process's on the same worker simultaneously.
For loading/processing/writing large amounts of data use ETL tools like fivetran, airbyte, databricks, nifi, azure data factory etc.. and use airflow for scheduling and orchestration.
Upvotes: 6
Reputation: 6432
I had this happen when I was using a ThreadPoolExecutor, which doesn't release any resources until all the futures are done. To prevent the errors, I switched to processing four elements at a time:
while True:
chunk = itertools.islice(documents, 0, 4)
if not chunk:
break
with ThreadPoolExecutor(max_workers=4) as executor:
for each in executor.map(TextScraper(), chunk):
pass
Upvotes: 0
Reputation: 367
I had this issue too, but took a different approach.
Have you considered how your script may use less memory / use memory better, instead of simply increasing the available memory ?
with db_connector_warehouse.create_session() as session:
query = session.query(offers_table)\
.yield_per(chunk_size).enable_eagerloads(False)
for df in pd.read_sql(query.statement, session.bind, chunksize=chunk_size):
yield df
in the above example - bottom part passing chunksize to pandas will have it pull the dataframe in smaller chunks, however pandas still loads everything into memory first, and then gives you the part you requested (for read_sql, and likely other loading functions such as csv / xlsx but haven't looked into this).
So you must ensure that you don't load the entire dataset - if using SQL Alchemy's ORM you need to use the yield_per param. For normal connections, you can set the connection to stream the results
A couple useful resources if you'd rather go down the route of using less memory:
How to create a large pandas dataframe from an sql query without running out of memory?
https://pythonspeed.com/articles/pandas-sql-chunking/
and if you're not familiar with the yield flow control What does the "yield" keyword do?
Upvotes: 2
Reputation: 1475
This error occurs when the allocated resources are less than what is required. DAG execution is RAM limited. More memory can be consumed depending on the DAG’s nature. So it is always preferable to use machine types with higher memory. Since you are using Cloud Composer 1, autoscaling of the resources is not possible. It would be preferable to increase your resources.
Upvotes: 2
Reputation: 111
I resolved the issue by increasing memory size
https://github.com/apache/airflow/issues/10435
Should check the memory size of the pod that roles as worker while running the task
Upvotes: 11