¿Qué debemos hacer ... cargar JSON en la plataforma de datos?

¡Hola a todos!





En un artículo reciente, discutimos cómo avanzamos hacia la construcción de nuestra plataforma de datos. Hoy me gustaría profundizar en el "estómago" de nuestra plataforma y en el camino contarles cómo resolvimos uno de los problemas que surgieron en relación con la creciente variedad de fuentes de datos integradas.





Es decir, si volvemos a la imagen final del artículo anterior (la duplico especialmente para que sea más conveniente para los queridos lectores), hoy hablaremos con más profundidad sobre la implementación del "lado derecho" del esquema: el uno que se encuentra después de Apache NiFi.





Un diagrama de nuestro artículo anterior.
Un diagrama de nuestro artículo anterior.

Como recordatorio, nuestra empresa tiene más de 350 bases de datos relacionales. Naturalmente, no todos son "únicos" y muchos son esencialmente copias diferentes del mismo sistema instaladas en todas las tiendas de la red comercial, pero aún así existe un "zoológico de diversidad". Por lo tanto, no se puede prescindir de ningún marco que simplifique y acelere la integración de fuentes en la plataforma de datos.





El esquema general para entregar datos de fuentes a la capa Greenplum ODS utilizando el marco que desarrollamos se muestra a continuación:





Esquema general de entrega de datos a la capa Greenplum ODS
ODS- Greenplum
  1. - Kafka AVRO-, Apache NiFi, parquet S3.





  2. «» Spark’ :





    1. Compaction – ( «»), : distinct() coalesce(). S3. parsing' , « »;





    2. Parsing – , . , ( gzip) CSV- S3.





  3. – CSV- ODS- : external table S3 PXF S3 connector, pgsql ODS- Greenplum





  4. Airflow.





DAG’ Airflow . Parsing . , , :





  • ODS- - ;





  • Git YAML-:





    • ( : , , S3-, , email ..);





    • ODS ( , , ODS- ). , ;





, . , , JSON-. , MongoDB MongoDB Kafka source connector Kafka. framework’ . , S3 JSON - " ", parquet Apache NiFi.





Compaction. , «» , :





df = spark.read.format(in_format) \
               .options(**in_options) \
               .load(path) \
               .distinct()    
new_df = df.coalesce(div)
new_df.write.mode("overwrite") \ 
            .format(out_format) \
            .options(**out_options) \
            .save(path)
      
      



JSON-, - , JSON’ Spark mergeSchema, .. , . – , - . « ».





-, , , S3. :





JSON- DataFrame , JSON-.





. , :





file1:





{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
      
      



. JSON-, 1 = 1 . , , JSON-, JSON-. JSON- S3 ( " Apache NiFi).





:





#  
df = spark.read \
          .format("csv") \
          .option("sep", "\a") \
          .load("file1.json")

#   DataFrame
df.printSchema()

root
 |-- _c0: string (nullable = true)

#  
df.show()

+--------------------+
|                 _c0|
+--------------------+
|{"productId": 1, ...|
|{"productId": 2, ...|
+--------------------+
      
      



JSON CSV, , . , Bell character. DataFrame , dicstinct() coalesce(), . :





#  parquet
in_format = "parquet"
in_options = {}

#  JSON
in_format = "csv"
in_options = {"sep": "\a"}
      
      



DataFrame S3 :





df.write.mode("overwrite") \   
        .format(out_format) \
				.options(**out_options) \  
				.save(path)  

#  JSON     
out_format = "text" 
out_options = {"compression": "gzip"}  

#  parquet   
out_format = input_format 
out_options = {"compression": "snappy"}
      
      



Parsing. , , : JSON -, parquet, . , JSON- Spark , , JSON- , mergeSchema. . , - «field_1», , , . Spark DataFrame , Parsing, , - - , .





. , :





file1 ( ):





{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
      
      



file2:





{«productId»: 3, «productName»: «ProductName 3», «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5, «package»: [10, 20.5, 30]}}
      
      



Spark’ DataFrame:





df = spark.read \
          .format("json") \
          .option("multiline", "false") \
          .load(path)
df.printSchema()
df.show()
      
      



( ):





root
 |-- dimensions: struct (nullable = true)
 |    |-- height: double (nullable = true)
 |    |-- length: long (nullable = true)
 |    |-- width: long (nullable = true)
 |-- price: double (nullable = true)
 |-- productId: long (nullable = true)
 |-- productName: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------+-----+---------+-------------+--------------+
|    dimensions|price|productId|  productName|          tags|
+--------------+-----+---------+-------------+--------------+
|[12.5, 10, 12]| null|        1|ProductName 1|[tag 1, tag 2]|
|[12.5, 10, 12]|10.01|        2|         null|[tag 1, tag 2]|
+--------------+-----+---------+-------------+--------------+
      
      



( ):





root
 |-- dimensions: struct (nullable = true)
 |    |-- height: double (nullable = true)
 |    |-- length: long (nullable = true)
 |    |-- package: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- width: long (nullable = true)
 |-- productId: long (nullable = true)
 |-- productName: string (nullable = true)

+--------------------+---------+-------------+
|          dimensions|productId|  productName|
+--------------------+---------+-------------+
|[12.5, 10, [10.0,...|        3|ProductName 3|
+--------------------+---------+-------------+
      
      



, Spark . - , , DataFrame null ( price productName ).





, , ( ) ,





root
 |-- price: double (nullable = true)
 |-- productId: long (nullable = true)
 |-- productName: string (nullable = true)
      
      



«- file2», «price» , Spark , «price» DataFrame. parquet- , parquet- AVRO, , , parquet-.





, , , framework’, - JSON’ – JSON- S3.





, JSON- JSON- . JSON’ - , DataFrame , null:





df = spark.read \
          .format("json") \
          .option("multiline","false") \
          .schema(df_schema) \
          .load(path)
      
      



- YAML- . , Kafka, , Kafka Schema Registry, JSON ( , , Kafka Schema Registry ).





, :





  • Kafka Schema Registry





  • pyspark.sql.types.StructType – - :





# 1.   Kafka Schema Registry REST API   
# 2.     schema  :
df_schema = StructType.fromJson(schema)
      
      



  • JSON-





, … JSON-, Spark’. JSON file2 . JSON , :





df.schema.json()  
      
      



{
    "fields":
    [
        {
            "metadata": {},
            "name": "dimensions",
            "nullable": true,
            "type":
            {
                "fields":
                [
                    {"metadata":{},"name":"height","nullable":true,"type":"double"},
                    {"metadata":{},"name":"length","nullable":true,"type":"long"},
                    {"metadata":{},"name":"width","nullable":true,"type":"long"}
                ],
                "type": "struct"
            }
        },
        {
            "metadata": {},
            "name": "price",
            "nullable": true,
            "type": "double"
        },
        {
            "metadata": {},
            "name": "productId",
            "nullable": true,
            "type": "long"
        },
        {
            "metadata": {},
            "name": "productName",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "tags",
            "nullable": true,
            "type":
            {
                "containsNull": true,
                "elementType": "string",
                "type": "array"
            }
        }
    ],
    "type": "struct"
}

      
      



, JSON-.





« , JSON- , Spark’» - … , , , :





DataFrame JSON,





https://github.com/zalando-incubator/spark-json-schema, , Scala, pySpark …





, SchemaConverter. – . , «» - .





, , JSON. DataPlatform : NiFi Kafka, parquet, « » NiFi AVRO-schema, S3. - - -:





, :)
root
 |-- taskId: string (nullable = true)
 |-- extOrderId: string (nullable = true)
 |-- taskStatus: string (nullable = true)
 |-- taskControlStatus: string (nullable = true)
 |-- documentVersion: long (nullable = true)
 |-- buId: long (nullable = true)
 |-- storeId: long (nullable = true)
 |-- priority: string (nullable = true)
 |-- created: struct (nullable = true)
 |    |-- createdBy: string (nullable = true)
 |    |-- created: string (nullable = true)
 |-- lastUpdateInformation: struct (nullable = true)
 |    |-- updatedBy: string (nullable = true)
 |    |-- updated: string (nullable = true)
 |-- customerId: string (nullable = true)
 |-- employeeId: string (nullable = true)
 |-- pointOfGiveAway: struct (nullable = true)
 |    |-- selected: string (nullable = true)
 |    |-- available: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- dateOfGiveAway: string (nullable = true)
 |-- dateOfGiveAwayEnd: string (nullable = true)
 |-- pickingDeadline: string (nullable = true)
 |-- storageLocation: string (nullable = true)
 |-- currentStorageLocations: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- customerType: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- totalAmount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- stockDecrease: boolean (nullable = true)
 |-- offline: boolean (nullable = true)
 |-- trackId: string (nullable = true)
 |-- transportationType: string (nullable = true)
 |-- stockRebook: boolean (nullable = true)
 |-- notificationStatus: string (nullable = true)
 |-- lines: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- lineId: string (nullable = true)
 |    |    |-- extOrderLineId: string (nullable = true)
 |    |    |-- productId: string (nullable = true)
 |    |    |-- lineStatus: string (nullable = true)
 |    |    |-- lineControlStatus: string (nullable = true)
 |    |    |-- orderedQuantity: double (nullable = true)
 |    |    |-- confirmedQuantity: double (nullable = true)
 |    |    |-- assignedQuantity: double (nullable = true)
 |    |    |-- pickedQuantity: double (nullable = true)
 |    |    |-- controlledQuantity: double (nullable = true)
 |    |    |-- allowedForGiveAwayQuantity: double (nullable = true)
 |    |    |-- givenAwayQuantity: double (nullable = true)
 |    |    |-- returnedQuantity: double (nullable = true)
 |    |    |-- sellingScheme: string (nullable = true)
 |    |    |-- stockSource: string (nullable = true)
 |    |    |-- productPrice: double (nullable = true)
 |    |    |-- lineAmount: double (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- markingFlag: string (nullable = true)
 |    |    |-- operations: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- operationId: string (nullable = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |-- reason: string (nullable = true)
 |    |    |    |    |-- quantity: double (nullable = true)
 |    |    |    |    |-- dmCodes: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- timeStamp: string (nullable = true)
 |    |    |    |    |-- updatedBy: string (nullable = true)
 |    |    |-- source: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |-- items: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- assignedQuantity: double (nullable = true)
 |-- linkedObjects: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- objectType: string (nullable = true)
 |    |    |-- objectId: string (nullable = true)
 |    |    |-- objectStatus: string (nullable = true)
 |    |    |-- objectLines: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- objectLineId: string (nullable = true)
 |    |    |    |    |-- taskLineId: string (nullable = true)

      
      



, , -, Avro- JSON-. : , «» . , , ( ) , JSON-, Kafka Schema Registry, «, ».





SparkJsonSchemaConverter – , definitions, refs ( ) oneOf. , «» JSON- pyspark.sql.types.StructType





, , Open Source, , , , Open Source . . Open Source , , !





SparkJsonSchemaConverter’ Parsing «» S3: ( ) S3 -:





#  JSON
df = spark.read.format(in_format)\
            .option("multiline", "false")\
            .schema(json_schema) \
            .load(path)

#  parquet:
df = spark.read.format(in_format)\
            .load(path)
      
      



, DataFrame’ CSV-.





framework’ Data Platform JSON- . :





  • 4 JSON-!





  • « » framework’, , «» .








All Articles