Descripción general de la nueva interfaz de usuario para transmisión estructurada en Apache Spark ™ 3.0

La traducción del artículo se preparó la víspera del inicio del curso de Data Engineer .










Streaming estructurado se introdujo por primera vez en Apache Spark 2.0. Esta plataforma se ha establecido como la mejor opción para crear aplicaciones de transmisión distribuida. La unificación de la API SQL / Dataset / DataFrame y las funciones integradas de Spark hace que sea mucho más fácil para los desarrolladores implementar sus complejas necesidades esenciales, como agregación de transmisión, unión de transmisión y compatibilidad con ventanas. Desde el lanzamiento de Structured Streaming, ha sido una solicitud popular de los desarrolladores mejorar el control de la transmisión, tal como lo hicimos en Spark Streaming (como DStream). En Apache Spark 3.0, lanzamos una nueva interfaz de usuario para transmisión estructurada.



La nueva IU Structured Streaming proporciona una manera fácil de monitorear todos los trabajos de transmisión con estadísticas y conocimientos prácticos, lo que facilita la resolución de problemas durante la depuración y mejora la visibilidad de la producción con métricas en tiempo real. La interfaz de usuario presenta dos conjuntos de estadísticas: 1) información agregada sobre el trabajo de consulta de transmisión y 2) información estadística detallada sobre las solicitudes de transmisión, incluida la velocidad de entrada, la velocidad de proceso, las filas de entrada, la duración del lote, la duración de la operación, etc.



Información agregada sobre trabajos de consulta de transmisión



Cuando un desarrollador envía una consulta SQL de transmisión, aparece en la pestaña Transmisión estructurada, que incluye tanto las consultas de transmisión activas como las completadas. La tabla de resultados proporcionará información básica sobre las solicitudes de transmisión, incluido el nombre de la solicitud, el estado, la ID, la ID de ejecución, la hora de envío, la duración de la solicitud, la ID del último paquete, así como información agregada como la tasa de recepción promedio y la tasa de procesamiento promedio. Hay tres tipos de estados de solicitud de transmisión: EN EJECUCIÓN, FINALIZADO y FALLO. Todas las solicitudes FINALIZADAS y FALLADAS se enumeran en la tabla de solicitudes de transmisión completadas. La columna Error muestra los detalles de la excepción de solicitud fallida.







Podemos ver estadísticas detalladas de la solicitud de transmisión haciendo clic en el enlace Run ID.



Información estadística detallada



La página Estadísticas muestra métricas que incluyen la tasa de ingestión / procesamiento, la latencia y la duración detallada de la operación, que son útiles para comprender el estado de sus solicitudes de transmisión, lo que facilita la depuración de anomalías en el procesamiento de solicitudes.









Contiene las siguientes métricas:



  • Tasa de entrada : tasa agregada (en todas las fuentes) de llegada de datos.
  • Tasa de proceso : la tasa agregada (en todas las fuentes) a la que Spark procesa los datos.
  • Duración del lote : la duración de cada lote.
  • Duración de la operación : el tiempo necesario para realizar varias operaciones en milisegundos.


Las transacciones monitoreadas se enumeran a continuación:



  • addBatch: tiempo dedicado a leer los datos de entrada del micro lote de las fuentes, procesarlos y escribir los datos de salida del lote para sincronizarlos. Esto suele llevar la mayor parte del tiempo de micro lotes.
  • getBatch: tiempo necesario para preparar una solicitud lógica para leer los datos de entrada del micropaquete actual de las fuentes.
  • getOffset: tiempo dedicado a preguntar a las fuentes si tienen nuevos insumos.
  • walCommit: Escribe compensaciones en los registros de metadatos.
  • queryPlanning: Crea un plan de ejecución.


Cabe señalar que no todas las operaciones enumeradas se mostrarán en la interfaz de usuario. Existen diferentes operaciones con diferentes tipos de fuentes de datos, por lo que algunas de las operaciones enumeradas se pueden realizar en una solicitud de transmisión.



Solución de problemas de rendimiento de transmisión mediante la interfaz de usuario



En esta sección, veremos algunos casos en los que la nueva transmisión estructurada de la interfaz de usuario indica que está sucediendo algo fuera de lo común. Una solicitud de demostración de alto nivel se ve así, y en cada caso asumiremos algunas condiciones previas:



import java.util.UUID

val bootstrapServers = ...
val topics = ...
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString

val lines = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("subscribe", topics)
    .load()
    .selectExpr("CAST(value AS STRING)")
    .as[String]

val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

val query = wordCounts.writeStream
    .outputMode("complete")
    .format("console")
    .option("checkpointLocation", checkpointLocation)
    .start()


Mayor latencia debido a una potencia de procesamiento insuficiente



En el primer caso, ejecutamos una solicitud para procesar los datos de Apache Kafka lo antes posible. Para cada lote, un trabajo de transmisión procesa todos los datos disponibles en Kafka. Si la potencia de procesamiento es insuficiente para manejar los datos del paquete, la latencia aumentará rápidamente. El juicio más intuitivo es que las filas de entrada y la duración del lote crecerán linealmente. El parámetro Input Rows especifica que el trabajo de transmisión puede procesar un máximo de 8000 escrituras por segundo. Pero la tasa de entrada actual es de unos 20.000 registros por segundo. Podemos proporcionar al trabajo de subprocesamiento más recursos para ejecutar, o podemos agregar suficientes particiones para manejar todos los consumidores necesarios para mantenerse al día con los productores.







Latencia estable pero alta



¿En qué se diferencia este caso del anterior? La latencia no aumenta, pero permanece estable, como se muestra en la siguiente captura de pantalla:







Descubrimos que la tasa de proceso puede permanecer estable a la misma tasa de entrada. Esto significa que la capacidad de procesamiento del trabajo es suficiente para procesar los datos de entrada. Sin embargo, el tiempo de procesamiento de cada lote, es decir, el retraso, sigue siendo de 20 segundos. La razón principal de la alta latencia es que hay demasiados datos en cada lote. Por lo general, podemos reducir la latencia aumentando el paralelismo de este trabajo. Después de agregar 10 particiones Kafka más y 10 núcleos para las tareas de Spark, encontramos que la latencia era de alrededor de 5 segundos, mucho mejor que 20 segundos.







Utilice la tabla de duración de la operación para solucionar problemas



El gráfico Duración de la operación muestra la cantidad de tiempo dedicado a realizar varias operaciones en milisegundos. Esto es útil para comprender el tiempo de cada lote y facilitar la resolución de problemas. Tomemos el trabajo de rendimiento " SPARK-30915 : Evite leer el archivo de registro de metadatos cuando busque el ID de lote más reciente" en la comunidad Apache Spark como ejemplo.

Antes de esta mejora, cada lote posterior después de la compresión llevaba más tiempo que otros lotes, cuando el registro de metadatos comprimidos se vuelve enorme.







Después de examinar el código, se encontró y se corrigió la lectura innecesaria del archivo de registro comprimido. El siguiente diagrama de duración de la operación confirma el efecto esperado:







Planes para el futuro



Como se muestra arriba, la nueva IU Structured Streaming ayudará a los desarrolladores a controlar mejor sus trabajos de transmisión al tener mucha más información útil sobre las solicitudes de transmisión. Como versión inicial, la nueva interfaz de usuario aún está en desarrollo y se mejorará en futuras versiones. Hay varias características que pueden implementarse en un futuro no muy lejano, incluidas, entre otras, las siguientes:



  • Obtenga más información sobre la ejecución de consultas de transmisión: datos tardíos, marcas de agua, métricas de estado de datos y más.
  • Compatibilidad con la interfaz de usuario de transmisión estructurada en Spark History Server.
  • Pistas más notables de comportamiento inusual: latencia, etc.


Prueba una nueva interfaz de usuario



Pruebe esta nueva interfaz de usuario de Spark Streaming en Apache Spark 3.0 en el nuevo Databricks Runtime 7.1. Si está utilizando cuadernos de Databricks, esto también le dará una manera fácil de observar el estado de cualquier solicitud de transmisión en el cuaderno y administrar sus solicitudes . Puede registrarse para obtener una cuenta gratuita de Databricks y comenzar en minutos de forma gratuita, sin ninguna información de crédito.






La calidad de los datos en DWH es la coherencia del Data Warehouse. seminario web gratuito.






Lectura recomendada:



Herramienta de creación de datos, o lo que el almacén de datos y el batido tienen

en común Buceo en Delta Lake: aplicación de esquemas y evolución

Apache Parquet de alta velocidad en Python con Apache Arrow



All Articles