Timeout in airflow DockerOperator

I am new to Airflow. I am trying to run a container by an Airflow, but getting a timeout error:

[2021-07-21 07:02:06,176] {docker.py:231} INFO - Starting docker container from image python:3.9.2-slim
[2021-07-21 07:03:06,171] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 426, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 421, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.6/http/client.py", line 1379, in getresponse
    response.begin()
  File "/usr/local/lib/python3.6/http/client.py", line 311, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.6/http/client.py", line 272, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 727, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/util/retry.py", line 410, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/packages/six.py", line 735, in reraise
    raise value
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
    chunked=chunked,
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 428, in _make_request
    self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 336, in _raise_timeout
    self, url, "Read timed out. (read timeout=%s)" % timeout_value
urllib3.exceptions.ReadTimeoutError: HTTPConnectionPool(host='host.docker.internal', port=2375): Read timed out. (read timeout=60)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 319, in execute
    return self._run_image()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 258, in _run_image
    tty=self.tty,
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/container.py", line 430, in create_container
    return self.create_container_from_config(config, name)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/container.py", line 440, in create_container_from_config
    res = self._post_json(u, data=config, params=params)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 296, in _post_json
    return self._post(url, data=json.dumps(data2), **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/utils/decorators.py", line 46, in inner
    return f(self, *args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 233, in _post
    return self.post(url, **self._set_request_timeout(kwargs))
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 590, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/adapters.py", line 529, in send
    raise ReadTimeout(e, request=request)
requests.exceptions.ReadTimeout: HTTPConnectionPool(host='host.docker.internal', port=2375): Read timed out. (read timeout=60)
[2021-07-21 07:03:06,179] {taskinstance.py:1551} INFO - Marking task as UP_FOR_RETRY. dag_id=etl_in_ch, task_id=etl_in_ch, execution_date=20210721T070203, start_date=20210721T070205, end_date=20210721T070306
[2021-07-21 07:03:06,215] {local_task_job.py:149} INFO - Task exited with return code 1

I have Mac system, and have configured docker socket as shown here: https://github.com/puckel/docker-airflow/issues/543#issuecomment-741842728

My code for Airflow is:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.operators.dummy import DummyOperator

default_args = {
'owner'                 : 'airflow',
'description'           : 'Extract data from different sources into CH and train model with it',
'depend_on_past'        : False,
'start_date'            : datetime(2021, 7, 19),
'email_on_failure'      : False,
'email_on_retry'        : False,
'retries'               : 1,
'retry_delay'           : timedelta(minutes=5)
}

with DAG('etl_in_ch', default_args=default_args, schedule_interval="00 23 * * *", catchup=False) as dag:
    start_dag = DummyOperator(
        task_id='start_dag'
        )

    end_dag = DummyOperator(
        task_id='end_dag'
        )

    t1 = DockerOperator(
        task_id='etl_in_ch',
        image='python:3.9.2-slim',
        container_name='etl_in_ch',
        api_version='auto',
        auto_remove=True,
        command="apt-get update && apt-get install -y cron && apt-get install -y libxml2 libxslt-dev wget bzip2 gcc \
                && pip install --no-cache-dir  --upgrade pip \
                && pip install --no-cache-dir poetry==1.1.5 \
                && poetry config virtualenvs.create false\
                && poetry install --no-interaction --no-ansi \
                && chmod +x /src/__main__.py \
                && python __main__.py",
        docker_url="tcp://host.docker.internal:2375",
        network_mode="bridge",
        environment={"PYTHONDONTWRITEBYTECODE": 1, "PYTHONUNBUFFERED": 1},
        working_dir="/usr/src/copy_data",
        mounts=['./CH_ETL/src:/usr/src/copy_data', './pyproject.toml:pyproject.toml'],
        xcom_all=True
    )


    start_dag >> t1

    t1 >> end_dag

I saw that I may need to increase a docker timeout, but I do not understand exactly where, and I actually have tried already - on my machine, inside airflow-worker, inside bobrik/socat docker. Did not help.



Read more here: https://stackoverflow.com/questions/68465368/timeout-in-airflow-dockeroperator

Content Attribution

This content was originally published by MStikh at Recent Questions - Stack Overflow, and is syndicated here via their RSS feed. You can read the original post over there.

%d bloggers like this: