Nuevo KubernetesExecutor 2.0 en Airflow 2.0

Le presentaremos las nuevas funciones de KubernetesExecutor 2.0. Alerta de spoiler !!! El proceso es más rápido, más flexible y más fácil de entender.







Junto con Airflow 2.0, nos complace presentar un KubernetesExecutor completamente rediseñado. Esta nueva arquitectura es más rápida, más flexible y más fácil de entender que KubernetesExecutor 1.10. Como primer paso, nos gustaría presentarle las nuevas funciones de KubernetesExecutor 2.0.







¿Qué es KubernetesExecutor?



En 2018, presentamos KubernetesExecutor basado en las ideas de autoescalado y flexibilidad. Airflow aún no tenía un concepto claro para el autoescalado de Celery Workers (aunque nuestro trabajo reciente con KEDA en este sentido ha sido muy exitoso), por lo que queríamos crear un sistema que pudiera satisfacer las necesidades del usuario. Como resultado de esta investigación, se creó un sistema que utiliza la API de Kubernetes para ejecutar una tarea de flujo de aire por pod. Un efecto secundario valioso de este sistema basado en API de Kubernetes es que abrió la capacidad para que los usuarios agreguen complementos y restricciones únicos para cada tarea.







Usando la API de Kubernetes y KubernetesExecutor, los usuarios de Airflow pueden determinar que ciertas tareas tienen acceso a ciertos secretos, o que una tarea solo se puede realizar en un nodo que existe en la Unión Europea (que puede ser útil para la gestión de datos). Los usuarios también pueden especificar cuántos recursos está ocupando una tarea, lo que puede variar mucho según lo que esté haciendo la tarea (por ejemplo, se requiere acceso a las GPU para ejecutar un script de TensorFlow). Con esta API, KubernetesExecutor permite a los ingenieros de datos tener un control mucho más preciso sobre cómo Airflow realiza sus tareas de lo que solo usarían las colas de Celery existentes.







, KubernetesExecutor . pod , , Celery ( , ). , CeleryExecutor , . , CeleryExecutor, KubernetesExecutor Airflow, Airflow 2.0 , CeleryKubernetesExecutor, !







KubernetesExecutor



podtemplate



Airflow 1.10.12 pod_template_file



. Kubernetes KubernetesExecutor. , Airflow API Kubernetes .







pod_template_files



Airflow. pod_template_file



, , , CeleryExecutor .







pod pod_template_files



, 2.0 , , pod Kubernetes, . pod , Celery. — KubernetesExecutor.







Execitor_config



Airflow 2.0 executor_config



, . , Python , API Kubernetes. executor_config



podOverride



. , .







, executeor_config



- Airflow 2.0, . , .







podmutationhook



1.10.12, pod_mutation_hook



Kubernetes V1Pod Airflow pod Kubernetes API , Airflow pod. pod, KubernetesExecutor, pod, KubernetesPodOperator.









KubernetesExecutor. , pod_template_file



pod, Kubernetes pod_override



pod_mutation_hook



pod. , .







, KubernetesExecutor.













, , , . Pod , . .







.













. pod, . V1pod, .









Airflow DevOps, .







, DAG, , executor_config



podOverride. , Kubernetes DAG, , KubernetesPodOperator . KubernetesPodOperator Docker , . , executeor_config



, Kubernetes API podOverride , , , , . . , .







, , , , Python pod, . executeor_config



podOverride , PythonOperator API TaskFlow. DAG :







from airflow.decorators import dag, task
from datetime import datetime

import os
import json
import requests
from kubernetes.client import models as k8s

new_config ={ "pod_override": k8s.V1Pod(
                metadata=k8s.V1ObjectMeta(labels={"purpose": "pod-override-example"}),
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            env=[
                                k8s.V1EnvVar(name="STATE", value="wa")
                                ],
                            )
                        ]
                    )
                )
            }

default_args = {
    'start_date': datetime(2021, 1, 1)
}

@dag('k8s_executor_example', schedule_interval='@daily', default_args=default_args, catchup=False)
def taskflow():

    @task(executor_config=new_config)
    def get_testing_increase():
        """
        Gets totalTestResultsIncrease field from Covid API for given state and returns value
        """
        url = 'https://covidtracking.com/api/v1/states/'
        res = requests.get(url+'{0}/current.json'.format(os.environ['STATE']))
        return{'testing_increase': json.loads(res.text)['totalTestResultsIncrease']}

    get_testing_increase()

dag = taskflow()
      
      





new_config



, pod Kubernetes API. DAG , API Covid . , podOverride. Airflow Kubernetes.







KubernetesExecutor



KubernetesExecutor, . , — .







YAML. DAG, DAG git DAG Kubernetes Volume.







, airflow.cfg YAML . YAML .







La mejor parte de estas tres nuevas funciones es que todas están disponibles en Airflow 1.10.13. Puede iniciar el proceso de migración de inmediato y disfrutar de los beneficios y la aceleración de este diseño más simple. Esperamos sus comentarios y no dude en contactarnos con cualquier pregunta, solicitud de funciones o documentación.








All Articles