Desarrollo de modelos en PySpark ML en un conjunto de datos con diferentes tipos de datos para maniquíes oxidados

¿Ya sabe cómo trabajar con múltiples tipos de datos en PySpark ML? ¿No? Entonces necesitas visitarnos urgentemente.



imagen



¡Hola! Quiero cubrir en detalle uno interesante, pero, desafortunadamente, no es un tema en la documentación de Spark: ¿cómo entrenar un modelo en PySpark ML en un conjunto de datos con diferentes tipos de datos (cadenas y números)? El deseo de escribir este artículo fue causado por la necesidad de navegar por Internet durante varios días en busca del artículo necesario con el código, porque el tutorial oficial de Spark brinda un ejemplo de trabajo no solo con signos de un tipo de datos, sino en general con un signo, sino información sobre cómo trabajar con varias columnas los tipos de datos más diferentes, no hay. Sin embargo, habiendo estudiado en detalle las capacidades de PySpark para trabajar con datos, logré escribir código de trabajo y comprender cómo sucede todo, lo que quiero compartir con ustedes. ¡Así que adelante a toda velocidad, amigos!



Inicialmente, importemos todas las bibliotecas necesarias para el trabajo, y luego analizaremos el código en detalle para que cualquier "tetera oxidada" que se precie, como, por cierto, recientemente, lo entenderá todo:



#  
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as sf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
#other types of regression models
#     
#from pyspark.ml.regression import LinearRegression
#from pyspark.ml.regression import RandomForestRegressor
#from pyspark.ml.regression import GeneralizedLinearRegression
#from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator


Ahora creemos un contexto Spark (local) y una sesión de Spark y verifiquemos si todo funciona mostrándolo en la pantalla. La creación de una sesión de Spark es el punto de partida para trabajar con conjuntos de datos en Spark:



#  
sc = SparkContext('local')
spark = SparkSession(sc)
spark






Hay una herramienta para trabajar con datos, ahora cargámosla. El artículo utiliza un conjunto de datos que se tomó del sitio de competencia de aprendizaje automático de Kaggle:

https://www.kaggle.com/unitednations/international-greenhouse-gas-emissions

que, después de la descarga, se almacena en path_csv en formato .csv y tiene las siguientes opciones:



  • encabezado: si la primera línea de nuestro archivo es un encabezado, ponemos "verdadero"
  • delimitador: ponemos un signo separando los datos de una línea por signos, muchas veces es "," o ";"
  • inferSchema: si es verdadero, PySpark detectará automáticamente el tipo de cada columna, de lo contrario tendrá que escribirlo usted mismo


#   .csv  path_csv
path_csv = 'greenhouse_gas_inventory_data_data.csv'
data = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", ",")\
        .option("inferSchema", "true")\
        .load(path_csv)


Para comprender mejor con qué tipo de datos estamos tratando, veamos algunas de sus líneas:



#   
data.show()




Veamos también cuántas filas tenemos en el conjunto de datos:

#  
data.select('year').count()






Y finalmente, infieramos los tipos de nuestros datos, que, como recordamos, le pedimos a PySpark que los determinara automáticamente usando la opción ("inferSchema", "true"):



#     
data.printSchema()






Ahora pasemos a nuestro plato principal, trabajando con varios signos de diferentes tipos de datos. Spark puede entrenar el modelo sobre los datos transformados, donde la columna predicha es un vector y las columnas con características también son un vector, lo que complica la tarea ... Pero no nos damos por vencidos, y para entrenar el modelo en PySpark usaremos Pipeline, al cual pasaremos un determinado plan de acción (variable etapas):



  1. paso label_stringIdx: transformamos la columna del conjunto de datos de valor que queremos predecir en una cadena de vector Spark y le cambiamos el nombre para etiquetarlo con el parámetro handleInvalid = 'keep', lo que significa que nuestra columna predicha admite null
  2. paso stringIndexer: convierte columnas de cadenas en cadenas categóricas de Spark
  3. encoder: ()
  4. assembler: Spark, , VectorAssembler(), ( ) (assemblerInputs) «features»
  5. gbt: PySpark ML GBTRegressor,


#value -      - 
stages = []
label_stringIdx = StringIndexer(inputCol = 'value', outputCol = 'label', handleInvalid = 'keep')
stages += [label_stringIdx]

#depend on categorical columns: country and types of emission
#   :    
categoricalColumns = ['country_or_area', 'category']
for categoricalCol in categoricalColumns:
    #        
    stringIndexer = StringIndexer(inputCol = categoricalCol,
                                  outputCol = categoricalCol + 'Index',
                                  handleInvalid = 'keep')
    encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(),
                            outputCol=categoricalCol + "classVec")
    stages += [stringIndexer, encoder]

#   : 
numericCols = ['year']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
#    - - 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


Dividamos nuestro conjunto de datos en muestras de entrenamiento y prueba en la proporción favorita de 70% a 30%, respectivamente, y comencemos a entrenar el modelo usando un árbol de impulso de regresión de gradiente (GBTRegressor), que debería predecir el vector de "etiqueta" según las características previamente combinadas en un vector de "características" con límite iterable maxIter = 10:



#       (30% )
(trainingData, testData) = data.randomSplit([0.7, 0.3])

#  (   )
gbt = GBTRegressor(labelCol="label", featuresCol="features", maxIter=10)
stages += [gbt]

#   stages    
pipeline = Pipeline(stages=stages)


Y ahora solo necesitamos enviarle a la computadora un plan de acción y un conjunto de datos de entrenamiento:



#  
model = pipeline.fit(trainingData)

#     
predictions = model.transform(testData)


Guardemos nuestro modelo para que siempre podamos volver a usarlo sin tener que volver a entrenarnos:



# 
pipeline.write().overwrite().save('model/gbtregr_model')


Y si decide comenzar a usar el modelo entrenado para las predicciones nuevamente, simplemente escriba:



#     
load_model = pipeline.read().load('model/gbtregr_model')




Entonces, vimos cómo en una herramienta para trabajar con big data en el lenguaje Python, PySpark, se implementa el trabajo con varias columnas de características de diferentes tipos de datos.



Ahora es el momento de aplicar esto a sus modelos ...



All Articles