Le presentaremos las nuevas funciones de KubernetesExecutor 2.0. Alerta de spoiler !!! El proceso es más rápido, más flexible y más fácil de entender.
Junto con Airflow 2.0, nos complace presentar un KubernetesExecutor completamente rediseñado. Esta nueva arquitectura es más rápida, más flexible y más fácil de entender que KubernetesExecutor 1.10. Como primer paso, nos gustaría presentarle las nuevas funciones de KubernetesExecutor 2.0.
¿Qué es KubernetesExecutor?
En 2018, presentamos KubernetesExecutor basado en las ideas de autoescalado y flexibilidad. Airflow aún no tenía un concepto claro para el autoescalado de Celery Workers (aunque nuestro trabajo reciente con KEDA en este sentido ha sido muy exitoso), por lo que queríamos crear un sistema que pudiera satisfacer las necesidades del usuario. Como resultado de esta investigación, se creó un sistema que utiliza la API de Kubernetes para ejecutar una tarea de flujo de aire por pod. Un efecto secundario valioso de este sistema basado en API de Kubernetes es que abrió la capacidad para que los usuarios agreguen complementos y restricciones únicos para cada tarea.
Usando la API de Kubernetes y KubernetesExecutor, los usuarios de Airflow pueden determinar que ciertas tareas tienen acceso a ciertos secretos, o que una tarea solo se puede realizar en un nodo que existe en la Unión Europea (que puede ser útil para la gestión de datos). Los usuarios también pueden especificar cuántos recursos está ocupando una tarea, lo que puede variar mucho según lo que esté haciendo la tarea (por ejemplo, se requiere acceso a las GPU para ejecutar un script de TensorFlow). Con esta API, KubernetesExecutor permite a los ingenieros de datos tener un control mucho más preciso sobre cómo Airflow realiza sus tareas de lo que solo usarían las colas de Celery existentes.
, KubernetesExecutor . pod , , Celery ( , ). , CeleryExecutor , . , CeleryExecutor, KubernetesExecutor Airflow, Airflow 2.0 , CeleryKubernetesExecutor, !
KubernetesExecutor
podtemplate
Airflow 1.10.12 pod_template_file
. Kubernetes KubernetesExecutor. , Airflow API Kubernetes .
pod_template_files
Airflow. pod_template_file
, , , CeleryExecutor .
pod pod_template_files
, 2.0 , , pod Kubernetes, . pod , Celery. — KubernetesExecutor.
Execitor_config
Airflow 2.0 executor_config
, . , Python , API Kubernetes. executor_config
podOverride
. , .
, executeor_config
- Airflow 2.0, . , .
podmutationhook
1.10.12, pod_mutation_hook
Kubernetes V1Pod Airflow pod Kubernetes API , Airflow pod. pod, KubernetesExecutor, pod, KubernetesPodOperator.
KubernetesExecutor. , pod_template_file
pod, Kubernetes pod_override
pod_mutation_hook
pod. , .
, KubernetesExecutor.
, , , . Pod , . .
.
. pod, . V1pod, .
Airflow DevOps, .
, DAG, , executor_config
podOverride. , Kubernetes DAG, , KubernetesPodOperator . KubernetesPodOperator Docker , . , executeor_config
, Kubernetes API podOverride , , , , . . , .
, , , , Python pod, . executeor_config
podOverride , PythonOperator API TaskFlow. DAG :
from airflow.decorators import dag, task from datetime import datetime import os import json import requests from kubernetes.client import models as k8s new_config ={ "pod_override": k8s.V1Pod( metadata=k8s.V1ObjectMeta(labels={"purpose": "pod-override-example"}), spec=k8s.V1PodSpec( containers=[ k8s.V1Container( name="base", env=[ k8s.V1EnvVar(name="STATE", value="wa") ], ) ] ) ) } default_args = { 'start_date': datetime(2021, 1, 1) } @dag('k8s_executor_example', schedule_interval='@daily', default_args=default_args, catchup=False) def taskflow(): @task(executor_config=new_config) def get_testing_increase(): """ Gets totalTestResultsIncrease field from Covid API for given state and returns value """ url = 'https://covidtracking.com/api/v1/states/' res = requests.get(url+'{0}/current.json'.format(os.environ['STATE'])) return{'testing_increase': json.loads(res.text)['totalTestResultsIncrease']} get_testing_increase() dag = taskflow()
new_config
, pod Kubernetes API. DAG , API Covid . , podOverride. Airflow Kubernetes.
KubernetesExecutor
KubernetesExecutor, . , — .
YAML. DAG, DAG git DAG Kubernetes Volume.
, airflow.cfg YAML . YAML .
La mejor parte de estas tres nuevas funciones es que todas están disponibles en Airflow 1.10.13. Puede iniciar el proceso de migración de inmediato y disfrutar de los beneficios y la aceleración de este diseño más simple. Esperamos sus comentarios y no dude en contactarnos con cualquier pregunta, solicitud de funciones o documentación.