¡Hola a todos!
En un artículo reciente, discutimos cómo avanzamos hacia la construcción de nuestra plataforma de datos. Hoy me gustaría profundizar en el "estómago" de nuestra plataforma y en el camino contarles cómo resolvimos uno de los problemas que surgieron en relación con la creciente variedad de fuentes de datos integradas.
Es decir, si volvemos a la imagen final del artículo anterior (la duplico especialmente para que sea más conveniente para los queridos lectores), hoy hablaremos con más profundidad sobre la implementación del "lado derecho" del esquema: el uno que se encuentra después de Apache NiFi.
Como recordatorio, nuestra empresa tiene más de 350 bases de datos relacionales. Naturalmente, no todos son "únicos" y muchos son esencialmente copias diferentes del mismo sistema instaladas en todas las tiendas de la red comercial, pero aún así existe un "zoológico de diversidad". Por lo tanto, no se puede prescindir de ningún marco que simplifique y acelere la integración de fuentes en la plataforma de datos.
El esquema general para entregar datos de fuentes a la capa Greenplum ODS utilizando el marco que desarrollamos se muestra a continuación:
- Kafka AVRO-, Apache NiFi, parquet S3.
«» Spark’ :
Compaction – ( «»), : distinct() coalesce(). S3. parsing' , « »;
Parsing – , . , ( gzip) CSV- S3.
– CSV- ODS- : external table S3 PXF S3 connector, pgsql ODS- Greenplum
Airflow.
DAG’ Airflow . Parsing . , , :
ODS- - ;
Git YAML-:
( : , , S3-, , email ..);
ODS ( , , ODS- ). , ;
, . , , JSON-. , MongoDB MongoDB Kafka source connector Kafka. framework’ . , S3 JSON - " ", parquet Apache NiFi.
Compaction. , «» , :
df = spark.read.format(in_format) \
.options(**in_options) \
.load(path) \
.distinct()
new_df = df.coalesce(div)
new_df.write.mode("overwrite") \
.format(out_format) \
.options(**out_options) \
.save(path)
JSON-, - , JSON’ Spark mergeSchema, .. , . – , - . « ».
-, , , S3. :
JSON- DataFrame , JSON-.
. , :
file1:
{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
. JSON-, 1 = 1 . , , JSON-, JSON-. JSON- S3 ( " Apache NiFi).
:
#
df = spark.read \
.format("csv") \
.option("sep", "\a") \
.load("file1.json")
# DataFrame
df.printSchema()
root
|-- _c0: string (nullable = true)
#
df.show()
+--------------------+
| _c0|
+--------------------+
|{"productId": 1, ...|
|{"productId": 2, ...|
+--------------------+
JSON CSV, , . , Bell character. DataFrame , dicstinct() coalesce(), . :
# parquet
in_format = "parquet"
in_options = {}
# JSON
in_format = "csv"
in_options = {"sep": "\a"}
DataFrame S3 :
df.write.mode("overwrite") \
.format(out_format) \
.options(**out_options) \
.save(path)
# JSON
out_format = "text"
out_options = {"compression": "gzip"}
# parquet
out_format = input_format
out_options = {"compression": "snappy"}
Parsing. , , : JSON -, parquet, . , JSON- Spark , , JSON- , mergeSchema. . , - «field_1», , , . Spark DataFrame , Parsing, , - - , .
. , :
file1 ( ):
{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
file2:
{«productId»: 3, «productName»: «ProductName 3», «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5, «package»: [10, 20.5, 30]}}
Spark’ DataFrame:
df = spark.read \
.format("json") \
.option("multiline", "false") \
.load(path)
df.printSchema()
df.show()
( ):
root
|-- dimensions: struct (nullable = true)
| |-- height: double (nullable = true)
| |-- length: long (nullable = true)
| |-- width: long (nullable = true)
|-- price: double (nullable = true)
|-- productId: long (nullable = true)
|-- productName: string (nullable = true)
|-- tags: array (nullable = true)
| |-- element: string (containsNull = true)
+--------------+-----+---------+-------------+--------------+
| dimensions|price|productId| productName| tags|
+--------------+-----+---------+-------------+--------------+
|[12.5, 10, 12]| null| 1|ProductName 1|[tag 1, tag 2]|
|[12.5, 10, 12]|10.01| 2| null|[tag 1, tag 2]|
+--------------+-----+---------+-------------+--------------+
( ):
root
|-- dimensions: struct (nullable = true)
| |-- height: double (nullable = true)
| |-- length: long (nullable = true)
| |-- package: array (nullable = true)
| | |-- element: double (containsNull = true)
| |-- width: long (nullable = true)
|-- productId: long (nullable = true)
|-- productName: string (nullable = true)
+--------------------+---------+-------------+
| dimensions|productId| productName|
+--------------------+---------+-------------+
|[12.5, 10, [10.0,...| 3|ProductName 3|
+--------------------+---------+-------------+
, Spark . - , , DataFrame null ( price productName ).
, , ( ) ,
root
|-- price: double (nullable = true)
|-- productId: long (nullable = true)
|-- productName: string (nullable = true)
«- file2», «price» , Spark , «price» DataFrame. parquet- , parquet- AVRO, , , parquet-.
, , , framework’, - JSON’ – JSON- S3.
, JSON- JSON- . JSON’ - , DataFrame , null:
df = spark.read \
.format("json") \
.option("multiline","false") \
.schema(df_schema) \
.load(path)
- YAML- . , Kafka, , Kafka Schema Registry, JSON ( , , Kafka Schema Registry ).
, :
Kafka Schema Registry
pyspark.sql.types.StructType – - :
# 1. Kafka Schema Registry REST API
# 2. schema :
df_schema = StructType.fromJson(schema)
JSON-
, … JSON-, Spark’. JSON file2 . JSON , :
df.schema.json()
{
"fields":
[
{
"metadata": {},
"name": "dimensions",
"nullable": true,
"type":
{
"fields":
[
{"metadata":{},"name":"height","nullable":true,"type":"double"},
{"metadata":{},"name":"length","nullable":true,"type":"long"},
{"metadata":{},"name":"width","nullable":true,"type":"long"}
],
"type": "struct"
}
},
{
"metadata": {},
"name": "price",
"nullable": true,
"type": "double"
},
{
"metadata": {},
"name": "productId",
"nullable": true,
"type": "long"
},
{
"metadata": {},
"name": "productName",
"nullable": true,
"type": "string"
},
{
"metadata": {},
"name": "tags",
"nullable": true,
"type":
{
"containsNull": true,
"elementType": "string",
"type": "array"
}
}
],
"type": "struct"
}
, JSON-.
« , JSON- , Spark’» - … , , , :
DataFrame JSON,
https://github.com/zalando-incubator/spark-json-schema, , Scala, pySpark …
, SchemaConverter. – . , «» - .
, , JSON. DataPlatform : NiFi Kafka, parquet, « » NiFi AVRO-schema, S3. - - -:
, :)
root |-- taskId: string (nullable = true) |-- extOrderId: string (nullable = true) |-- taskStatus: string (nullable = true) |-- taskControlStatus: string (nullable = true) |-- documentVersion: long (nullable = true) |-- buId: long (nullable = true) |-- storeId: long (nullable = true) |-- priority: string (nullable = true) |-- created: struct (nullable = true) | |-- createdBy: string (nullable = true) | |-- created: string (nullable = true) |-- lastUpdateInformation: struct (nullable = true) | |-- updatedBy: string (nullable = true) | |-- updated: string (nullable = true) |-- customerId: string (nullable = true) |-- employeeId: string (nullable = true) |-- pointOfGiveAway: struct (nullable = true) | |-- selected: string (nullable = true) | |-- available: array (nullable = true) | | |-- element: string (containsNull = true) |-- dateOfGiveAway: string (nullable = true) |-- dateOfGiveAwayEnd: string (nullable = true) |-- pickingDeadline: string (nullable = true) |-- storageLocation: string (nullable = true) |-- currentStorageLocations: array (nullable = true) | |-- element: string (containsNull = true) |-- customerType: string (nullable = true) |-- comment: string (nullable = true) |-- totalAmount: double (nullable = true) |-- currency: string (nullable = true) |-- stockDecrease: boolean (nullable = true) |-- offline: boolean (nullable = true) |-- trackId: string (nullable = true) |-- transportationType: string (nullable = true) |-- stockRebook: boolean (nullable = true) |-- notificationStatus: string (nullable = true) |-- lines: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- lineId: string (nullable = true) | | |-- extOrderLineId: string (nullable = true) | | |-- productId: string (nullable = true) | | |-- lineStatus: string (nullable = true) | | |-- lineControlStatus: string (nullable = true) | | |-- orderedQuantity: double (nullable = true) | | |-- confirmedQuantity: double (nullable = true) | | |-- assignedQuantity: double (nullable = true) | | |-- pickedQuantity: double (nullable = true) | | |-- controlledQuantity: double (nullable = true) | | |-- allowedForGiveAwayQuantity: double (nullable = true) | | |-- givenAwayQuantity: double (nullable = true) | | |-- returnedQuantity: double (nullable = true) | | |-- sellingScheme: string (nullable = true) | | |-- stockSource: string (nullable = true) | | |-- productPrice: double (nullable = true) | | |-- lineAmount: double (nullable = true) | | |-- currency: string (nullable = true) | | |-- markingFlag: string (nullable = true) | | |-- operations: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- operationId: string (nullable = true) | | | | |-- type: string (nullable = true) | | | | |-- reason: string (nullable = true) | | | | |-- quantity: double (nullable = true) | | | | |-- dmCodes: array (nullable = true) | | | | | |-- element: string (containsNull = true) | | | | |-- timeStamp: string (nullable = true) | | | | |-- updatedBy: string (nullable = true) | | |-- source: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- type: string (nullable = true) | | | | |-- items: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- assignedQuantity: double (nullable = true) |-- linkedObjects: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- objectType: string (nullable = true) | | |-- objectId: string (nullable = true) | | |-- objectStatus: string (nullable = true) | | |-- objectLines: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- objectLineId: string (nullable = true) | | | | |-- taskLineId: string (nullable = true)
, , -, Avro- JSON-. : , «» . , , ( ) , JSON-, Kafka Schema Registry, «, ».
SparkJsonSchemaConverter – , definitions, refs ( ) oneOf. , «» JSON- pyspark.sql.types.StructType
, , Open Source, , , , Open Source . . Open Source , , !
SparkJsonSchemaConverter’ Parsing «» S3: ( ) S3 -:
# JSON
df = spark.read.format(in_format)\
.option("multiline", "false")\
.schema(json_schema) \
.load(path)
# parquet:
df = spark.read.format(in_format)\
.load(path)
, DataFrame’ CSV-.
framework’ Data Platform JSON- . :
4 JSON-!
« » framework’, , «» .