Configuración de un clúster de flujo de aire multinodo con HDP Ambari y Celery para canalizaciones de datos

Airflow es la opción ideal para las canalizaciones de datos, es decir, la orquestación y planificación ETL. Es ampliamente utilizado y popular para las futuras canalizaciones de datos. Proporciona relleno, control de versiones y linaje a través de la abstracción funcional.

La programación funcional es el futuro.

Airflow Hadoop Spark Cluster, Airflow Spark/Hive/Hadoop Map Reduce, .


airflow-ambari-mpack ( Apache Airflow Apache Ambari), FOSS Contributor, .


Airflow Celery RabbitMQ

  1. Apache MPack Airflow

a. git clone
b. stop ambari server
c. install the apache mpack for airflow on ambari server
d. start ambari server

  1. Airflow Service Ambari



Ambari (Ambari UI), -> . (Actions -> Add Service)

HDP Ambari Dashboard

1 , Airflow Ambari.

Airflow Ambari

, -, . Airflow -, master , , Install Worker data-.

Ambari Master / Name Airflow

, - Airflow Airflow Name Hadoop / Spark.

, Airflow Worker Data .

, 3 (worker) data .

Airflow Ambari

Ambari UI: 3 Airflow

, , / , . + .

Airflow Ambari:

Airflow Service, Config Ambari.

Airflow Ambari

  1. Executor

executor = CeleryExecutor

Advanced airflow-core-site Executor CeleryExecutor.

  1. SQL Alchemy Connection

sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow

SQL Alchemy Connection

SQL Alchemy postgresql, .

  1. URL-

broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
celery_result_backend = db+postgresql://airflow:airflow@{HOSTNAME}/airflow

URL- Celery result backend Airflow

dags_are_paused_at_creation = True
load_examples = False


, Ambari Airflow, Ambari , Service Actions -> InitDB.

Airflow Initdb Ambari

airflow. Airflow.

- - Airflow:

  1. RabbitMQ :

  1. RabbitMQ :

  1. RabbitMQ :

  1. Celery Flower

Celery Flower — - Celery. — 5555.

, 3 , «» Celery .

Celery Flower

, «Celery Flower», -, Celery, . airflow flower

, - Flower.

nohup airflow flower >> /var/local/airflow/logs/flower.logs &

Airflow Ambari HDP Hadoop / Spark Cluster.

, .

Multi-Node Airflow Cluster

, Apache Airflow, « Multi-Node Airflow Cluster HDP Ambari Celery ». , . , .

, Multi-Node Airflow Cluster.

1. LocalExecutor CeleryExecutor , .

Worker Scheduler Celery.


AttributeError: ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’
Apr 10 21:03:52 charlie-prod [2019–04–10 21:03:51,962] {} ERROR — Error syncing the celery executor, ignoring it:
Apr 10 21:03:52 charlie-prod [2019–04–10 21:03:51,962] {} ERROR — ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’

Airflow , , , . . Celery .


Celery 3.3.5 ( Airflow 1.10 ( ).

pip install --upgrade celery
3.3.5 => 4.3

2: DAG CeleryExecutor DAG - , , .

Apr 11 14:13:13 charlie-prod return load(BytesIO(s))
Apr 11 14:13:13 charlie-prod TypeError: Required argument ‘object’ (pos 1) not found
Apr 11 14:13:13 charlie-prod [2019–04–11 14:13:13,847: ERROR/ForkPoolWorker-6285] Pool process <celery.concurrency.asynpool.Worker object at 0x7f3a88b7b250> error: TypeError(“Required argument ‘object’ (pos 1) not found”,)
Apr 11 14:13:13 charlie-prod Traceback (most recent call last):
Apr 11 14:13:13 charlie-prod File “/usr/lib64/python2.7/site-packages/billiard/”, line 289, in __call__
Apr 11 14:13:13 charlie-prod sys.exit(self.workloop(pid=pid))
Apr 11 14:13:13 charlie-prod File “/usr/lib64/python2.7/site-packages/billiard/”, line 347, in workloop
Apr 11 14:13:13 charlie-prod req = wait_for_job()
Apr 11 14:13:13 charlie-prod File “/usr/lib64/python2.7/site-packages/billiard/”, line 447, in receive
Apr 11 14:13:13 charlie-prod ready, req = _receive(1.0)
Apr 11 14:13:13 charlie-prod File “/usr/lib64/python2.7/site-packages/billiard/”, line 419, in _recv
Apr 11 14:13:13 charlie-prod return True, loads(get_payload())
Apr 11 14:13:13 charlie-prod File “/usr/lib64/python2.7/site-packages/billiard/”, line 101, in pickle_loads
Apr 11 14:13:13 charlie-prod return load(BytesIO(s))
Apr 11 14:13:13 charlie-prod TypeError: Required argument ‘object’ (pos 1) not found



airflow :

, , , , .


broker_url= amqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/

, pyamqp , , .


— , librabbitmq

, , py-amqp

, .



, , . pyamqp: // amqp (


broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/

amqp pyamqp .


pip install pyamqp

3: SQL Alchemy


SQL alchemy connection

sql_alchemy_conn = postgresql://airflow:airflow@{HOST_NAME}:5432/airflow



sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow




PostGreSQL: psycopg2


— PostgreSQL Python.

4: HDP 2.6.2 Ambari, Worker Installation .

- , , Celery worker , DAG .

:) .

by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Could not fetch URL There was a problem confirming the ssl certificate: HTTPSConnectionPool(host=’’, port=443): Max retries exceeded with url: /simple/apache-airflow/ (Caused by SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)) — skipping


, pip wheel , worker Ambari. wheel’s of pip.

, pypi wheel.

pip install --trusted-host --trusted-host --trusted-host --upgrade  --ignore-installed apache-airflow[celery]==1.10.0' returned 1. Collecting apache-airflow[celery]==1.10.0

, , . , .

resource_management.core.exceptions.ExecutionFailed: Execution of ‘export SLUGIFY_USES_TEXT_UNIDECODE=yes && pip install — trusted-host — trusted-host — trusted-host — upgrade — ignore-installed apache-airflow[celery]==1.10.0’ returned 1. Collecting apache-airflow[celery]==1.10.0
 Retrying (Retry(total=4, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=3, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=2, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=1, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=0, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Could not find a version that satisfies the requirement apache-airflow[celery]==1.10.0 (from versions: )
No matching distribution found for apache-airflow[celery]==1.10.0
You are using pip version 8.1.2, however version 19.0.3 is available.
You should consider upgrading via the ‘pip install — upgrade pip’ command.

pip, .

, Hack , , — , celery wheel pip, .

. , .

En el clúster, comenté manualmente estas líneas temporalmente ( luego revertí después de instalar exitosamente el trabajador ) y agregué un trabajador de Ambari que funcionó como un encanto :) y este truco me alegró el día.

Después de instalar el trabajador en otro nodo, es posible que deba reiniciar el servicio de flujo de aire desde Ambari. Puede obtener más información en mi publicación de blog anterior; Configuración de clúster de flujo de aire de varios nodos con HDP Ambari y apio para canalizaciones de datos

