Spark Schema Evolución en la práctica

Queridos lectores, ¡buen día!



En este artículo, el consultor líder de la línea de negocios Big Data Solutions en Neoflex, describe en detalle las opciones para construir escaparates de estructura variable usando Apache Spark.



Como parte de un proyecto de análisis de datos, a menudo surge la tarea de crear mercados basados ​​en datos poco estructurados.



Por lo general, estos son registros o respuestas de varios sistemas, guardados como JSON o XML. Los datos se cargan en Hadoop, luego debe crear un escaparate a partir de ellos. Podemos organizar el acceso al escaparate creado, por ejemplo, a través de Impala.



En este caso, el diseño del escaparate de destino se desconoce previamente. Además, el esquema no se puede elaborar de antemano, ya que depende de los datos, y estamos tratando con estos datos estructurados muy débilmente.



Por ejemplo, hoy se registra la siguiente respuesta:



{source: "app1", error_code: ""}


y mañana la siguiente respuesta viene del mismo sistema:



{source: "app1", error_code: "error", description: "Network error"}


Como resultado, se debe agregar otro campo al escaparate: descripción, y nadie sabe si vendrá o no.



La tarea de crear un mart con estos datos es bastante estándar y Spark tiene varias herramientas para ello. Tanto JSON como XML son compatibles para analizar datos sin procesar, y se proporciona compatibilidad con schemaEvolution para un esquema previamente desconocido.



A primera vista, la solución parece simple. Necesitamos tomar una carpeta con JSON y leerla en un marco de datos. Spark creará un esquema y convertirá los datos anidados en estructuras. Luego, todo debe guardarse en parquet, que también es compatible con Impala, registrando el escaparate en la metastore de Hive.



Todo parece sencillo.



Sin embargo, de los breves ejemplos de la documentación no queda claro qué hacer con una serie de problemas en la práctica.



La documentación describe un enfoque no para crear un escaparate, sino para leer JSON o XML en un marco de datos.



Es decir, simplemente se explica cómo leer y analizar JSON:



df = spark.read.json(path...)


Esto es suficiente para que los datos estén disponibles para Spark.



En la práctica, el escenario es mucho más complicado que simplemente leer archivos JSON de una carpeta y crear un marco de datos. La situación se ve así: ya existe una vitrina determinada, todos los días llegan nuevos datos, es necesario agregarlos a la vitrina, sin olvidar que el esquema puede ser diferente.



El esquema habitual para construir un escaparate es el siguiente:



Paso 1. Los datos se cargan en Hadoop, luego se recargan diariamente y se agregan a una nueva partición. Resulta que la carpeta con los datos iniciales particionados por días.



Paso 2.Durante el arranque de inicialización, Spark lee y analiza esta carpeta. El marco de datos resultante se guarda en un formato disponible para el análisis, por ejemplo, en parquet, que luego se puede importar a Impala. Esto crea un escaparate de destino con todos los datos que se han acumulado hasta este punto.



Paso 3. Se crea una descarga que actualizará el escaparate todos los días.

Surge la cuestión de la carga incremental, la necesidad de dividir la vitrina y la cuestión de respaldar el esquema general de la vitrina.



Pongamos un ejemplo. Digamos que se implementa el primer paso para construir el almacenamiento y se configura la exportación de archivos JSON a una carpeta.



No es un problema crear un marco de datos a partir de ellos y luego guardarlo como escaparate. Este es el primer paso que puede encontrar fácilmente en la documentación de Spark:



df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)


Todo parece estar bien,



Leímos y analizamos el JSON, luego guardamos el marco de datos como un parquet, registrándolo con Hive de cualquier manera conveniente:



df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')


Conseguimos un escaparate.



Pero, al día siguiente, se agregaron nuevos datos de la fuente. Tenemos una carpeta con JSON y un escaparate creado en base a esta carpeta. Después de cargar el siguiente bloque de datos de la fuente, el data mart se queda sin datos durante un día.



Una solución lógica sería dividir el escaparate por día, lo que permitirá agregar una nueva partición cada día siguiente. El mecanismo para esto también es bien conocido, Spark le permite escribir particiones por separado.



Primero, inicializamos la carga, guardamos los datos como se describió anteriormente, agregando solo particiones. Esta acción se denomina inicialización de escaparate y se realiza solo una vez:



df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)


Al día siguiente, solo cargamos una nueva partición:



df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")


Todo lo que queda es volver a registrarse con Hive para actualizar el esquema.

Sin embargo, aquí es donde surgen los problemas.



Primer problema. Tarde o temprano, el parquet resultante no se puede leer. Esto tiene que ver con la forma en que parquet y JSON se acercan a los campos vacíos.



Consideremos una situación típica. Por ejemplo, JSON llega ayer:



 1: {"a": {"b": 1}},


y hoy el mismo JSON se ve así:



 2: {"a": null}


Digamos que tenemos dos particiones diferentes con una fila cada una.

Cuando leemos todos los datos de origen, Spark podrá determinar el tipo y comprender que "a" es un campo de tipo "estructura", con un campo anidado "b" de tipo INT. Pero, si cada partición se guardó por separado, se obtiene un parquet con esquemas de partición incompatibles:



df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)


Esta situación es bien conocida, por lo que se ha agregado una opción especialmente para eliminar los campos vacíos al analizar los datos iniciales:



df = spark.read.json("...", dropFieldIfAllNull=True)


En este caso, el parquet estará formado por tabiques que se puedan leer juntos.

Aunque quienes lo hayan hecho en la práctica se reirán amargamente. ¿Por qué? Porque es probable que surjan dos situaciones más. O tres. O cuatro. El primero, que seguramente aparecerá, es que los tipos numéricos se verán diferentes en diferentes archivos JSON. Por ejemplo, {intField: 1} y {intField: 1.1}. Si dichos campos se encuentran en una parte, la combinación de esquemas leerá todo correctamente, lo que conducirá al tipo más preciso. Pero si es diferente, entonces uno tendrá intField: int, y el otro intField: double.



Existe la siguiente bandera para manejar esta situación:



df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)


Ahora tenemos una carpeta donde se ubican las particiones, que se puede leer en un solo marco de datos y un parquet válido para todo el escaparate. ¿Si? No.



Recuerde que registramos la tabla en Hive. Hive no distingue entre mayúsculas y minúsculas en los nombres de campo, mientras que parquet sí. Por lo tanto, las particiones con esquemas: field1: int y Field1: int son las mismas para Hive, pero no para Spark. Recuerde poner en minúscula los nombres de los campos.



Después de eso, todo parece ir bien.



Sin embargo, no todo es tan sencillo. Surge un segundo problema, también conocido. Dado que cada nueva partición se guarda por separado, los archivos del servicio Spark estarán en la carpeta de la partición, por ejemplo, el indicador de éxito de la operación _SUCCESS. Esto arrojará un error al intentar parquet. Para evitar esto, debe configurar la configuración desactivando Spark para que no agregue archivos de servicio a la carpeta:



hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")


Parece que ahora todos los días se agrega una nueva partición de parquet a la carpeta del escaparate de destino, donde se almacenan los datos analizados del día. Nos ocupamos de antemano de que no hubiera particiones con un conflicto de tipo de datos.



Pero, ante nosotros está el tercer problema. Ahora no se conoce el esquema general, además, en Hive, la tabla con el esquema incorrecto, ya que cada nueva partición, muy probablemente, introdujo distorsión en el esquema.



Necesita volver a registrar la mesa. Esto se puede hacer simplemente: leer el parquet del escaparate nuevamente, tomar el esquema y crear un DDL basado en él, con el cual volver a registrar la carpeta en Hive como una tabla externa, actualizando el esquema del escaparate de destino.



Nos enfrentamos a un cuarto problema. La primera vez que registramos la mesa, confiamos en Spark. Ahora lo hacemos nosotros mismos, y debes recordar que los campos de parquet pueden comenzar con caracteres que no son válidos para Hive. Por ejemplo, Spark arroja líneas que no pudo analizar en el campo "corrupt_record". Un campo de este tipo no se puede registrar con Hive sin escapar.



Sabiendo esto, obtenemos el esquema:



f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)


Código ("_corrupt_record", "` _corrupt_record` ") +" "+ f [1] .replace (": "," `:"). Reemplazar ("<", "<` "). Reemplazar (", " , ",` "). replace (" array <`", "array <") hace que DDL sea seguro, es decir, en lugar de:



create table tname (_field1 string, 1field string)


Con nombres de campo como "_field1, 1field", se crea un DDL seguro donde los nombres de los campos se escapan: crear tabla `tname` (cadena` _field1`, cadena `1field`).



Surge la pregunta: ¿cómo obtener correctamente el marco de datos con el esquema completo (en código pf)? ¿Cómo obtengo este pf? Este es el quinto problema. ¿Releer el esquema de todas las particiones de la carpeta con archivos de parquet del escaparate de destino? Este es el método más seguro, pero el más difícil.



El esquema ya está en Hive. Puede obtener un nuevo esquema combinando el esquema de toda la tabla y la nueva partición. Por lo tanto, debe tomar el esquema de tabla de Hive y combinarlo con el nuevo esquema de partición. Esto se puede hacer leyendo los metadatos de prueba de Hive, guardándolos en una carpeta temporal y leyendo ambas particiones con Spark a la vez.



Básicamente, tiene todo lo que necesita: el esquema de tabla original en Hive y una nueva partición. También tenemos los datos. Todo lo que queda es obtener un nuevo esquema que combine el esquema del escaparate y los nuevos campos de la partición creada:



from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")


A continuación, creamos el DDL para registrar la tabla como en el fragmento anterior.

Si toda la cadena funciona correctamente, es decir, hubo una carga de inicialización y en Hive hay una tabla creada correctamente, obtenemos un esquema de tabla actualizado.



Y el último problema es que no puede simplemente agregar una partición a la tabla de Hive, ya que se romperá. Necesitas forzar a Hive a arreglar la estructura de la partición:



from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)


La simple tarea de leer JSON y crear un escaparate basado en él se traduce en superar una serie de dificultades implícitas, para las que hay que buscar soluciones por separado. Aunque estas soluciones son sencillas, lleva mucho tiempo encontrarlas.



Para implementar la construcción de la vitrina, tuve que:



  • Agregue particiones al escaparate, deshaciéndose de los archivos de servicio
  • Trate los campos vacíos en los datos originales que Spark ha escrito
  • Lanzar tipos simples para encadenar
  • Convertir nombres de campo a minúsculas
  • Volcado de datos y registro de tabla separados en Hive (creación de DDL)
  • Recuerde escapar de los nombres de campo, que pueden no ser compatibles con Hive
  • Aprenda a actualizar el registro de una tabla en Hive


Resumiendo, observamos que la decisión de construir vitrinas esconde muchos escollos. Por lo tanto, si surgen dificultades en la implementación, es mejor contactar a un socio experimentado con experiencia exitosa.



Gracias por leer este artículo, esperamos que encuentre útil la información.



All Articles