Pruebas en transmisión estructurada de Apache Spark

Introducción



Actualmente, no hay muchos ejemplos de pruebas para aplicaciones basadas en Spark Structured Streaming. Por lo tanto, este artículo proporciona ejemplos de prueba básicos con descripciones detalladas.







Todos los ejemplos usan: Apache Spark 3.0.1.







Preparación



Necesitas instalar:







  • Apache Spark 3.0.x
  • Python 3.7 y el entorno virtual para él
  • Conda 4.y
  • scikit-learn 0.22.z
  • Maven 3.v
  • Los ejemplos de Scala usan la versión 2.12.10.


  1. Descarga Apache Spark
  2. Desempaquetar: tar -xvzf ./spark-3.0.1-bin-hadoop2.7.tgz
  3. Cree un entorno, por ejemplo, con conda: conda create -n sp python = 3.7


Necesita configurar variables de entorno. Aquí tienes un ejemplo de ejecución local.







SPARK_HOME=/Users/$USER/Documents/spark/spark-3.0.1-bin-hadoop2.7
PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip;
      
      





Pruebas



Un ejemplo con scikit-learn



Al escribir pruebas, debe separar su código para poder aislar la lógica y el uso real de la API final. Buen ejemplo de aislamiento: DataFrame-pandas , DataFrame-spark .







El siguiente ejemplo se utilizará para escribir pruebas: LinearRegression .







Así que probemos el código usando la siguiente "plantilla" de Python:







class XService:

    def __init__(self):
        # 

    def train(self, ds):
        # 

    def predict(self, ds):
        #    

      
      





Para Scala, la plantilla se ve así.







Ejemplo completo:







from sklearn import linear_model

class LocalService:

    def __init__(self):
        self.model = linear_model.LinearRegression()

    def train(self, ds):
        X, y = ds
        self.model.fit(X, y)

    def predict(self, ds):
        r = self.model.predict(ds)
        print(r)

      
      





Prueba.







Importar:







import unittest
import numpy as np
      
      





Clase principal:







class RunTest(unittest.TestCase):
      
      





Ejecución de pruebas:







if __name__ == "__main__":
    unittest.main()
      
      





Preparación de datos:







X = np.array([
    [1, 1],  # 6
    [1, 2],  # 8
    [2, 2],  # 9
    [2, 3]  # 11
])
y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3
      
      





Creación y formación de modelos:







service = local_service.LocalService()
service.train((X, y))
      
      





Obteniendo resultados:







service.predict(np.array([[3, 5]]))
service.predict(np.array([[4, 6]]))
      
      





Responder:







[16.]
[19.]
      
      





Juntos:







import unittest
import numpy as np

from spark_streaming_pp import local_service

class RunTest(unittest.TestCase):
    def test_run(self):
        # Prepare data.
        X = np.array([
            [1, 1],  # 6
            [1, 2],  # 8
            [2, 2],  # 9
            [2, 3]  # 11
        ])
        y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3
        # Create model and train.
        service = local_service.LocalService()
        service.train((X, y))
        # Predict and results.
        service.predict(np.array([[3, 5]]))
        service.predict(np.array([[4, 6]]))
        # [16.]
        # [19.]

if __name__ == "__main__":
    unittest.main()
      
      





Ejemplo con Spark y Python



LinearRegression. , Structured Streaming DataFrame-, Spark Sql. .







:







self.service = LinearRegression(maxIter=10, regParam=0.01)
self.model = None
      
      





:







self.model = self.service.fit(ds)
      
      





:







transformed_ds = self.model.transform(ds)
q = transformed_ds.select("label", "prediction").writeStream.format("console").start()
return q
      
      





:







from pyspark.ml.regression import LinearRegression

class StructuredStreamingService:
    def __init__(self):
        self.service = LinearRegression(maxIter=10, regParam=0.01)
        self.model = None

    def train(self, ds):
        self.model = self.service.fit(ds)

    def predict(self, ds):
        transformed_ds = self.model.transform(ds)
        q = transformed_ds.select("label", "prediction").writeStream.format("console").start()
        return q

      
      





.







, .







train_ds = spark.createDataFrame([
    (6.0, Vectors.dense([1.0, 1.0])),
    (8.0, Vectors.dense([1.0, 2.0])),
    (9.0, Vectors.dense([2.0, 2.0])),
    (11.0, Vectors.dense([2.0, 3.0]))
],
    ["label", "features"]
)
      
      





.







, , Structured Streaming, .. DataFrame , DataFrame.

, Spark.







def test_stream_read_options_overwrite(self):
    bad_schema = StructType([StructField("test", IntegerType(), False)])
    schema = StructType([StructField("data", StringType(), False)])
    df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \
        .schema(bad_schema)\
        .load(path='python/test_support/sql/streaming', schema=schema, format='text')
    self.assertTrue(df.isStreaming)
    self.assertEqual(df.schema.simpleString(), "struct<data:string>")
      
      





.







:







spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
      
      





( ):







train_ds = spark.createDataFrame([
    (6.0, Vectors.dense([1.0, 1.0])),
    (8.0, Vectors.dense([1.0, 2.0])),
    (9.0, Vectors.dense([2.0, 2.0])),
    (11.0, Vectors.dense([2.0, 3.0]))
],
    ["label", "features"]
)
      
      





:







service = structure_streaming_service.StructuredStreamingService()
service.train(train_ds)
      
      





. : . 3 .







def extract_features(x):
    values = x.split(",")
    features_ = []
    for i in values[1:]:
        features_.append(float(i))
    features = Vectors.dense(features_)
    return features

extract_features_udf = udf(extract_features, VectorUDT())

def extract_label(x):
    values = x.split(",")
    label = float(values[0])
    return label

extract_label_udf = udf(extract_label, FloatType())

predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \
    .withColumn("features", extract_features_udf(col("value"))) \
    .withColumn("label", extract_label_udf(col("value")))

service.predict(predict_ds).awaitTermination(3)
      
      





:







15.96699
18.96138
      
      





:







import unittest
import warnings

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
from pyspark.ml.linalg import Vectors, VectorUDT

from spark_streaming_pp import structure_streaming_service

class RunTest(unittest.TestCase):
    def test_run(self):
        spark = SparkSession.builder.enableHiveSupport().getOrCreate()
        spark.sparkContext.setLogLevel("ERROR")
        # Prepare data.
        train_ds = spark.createDataFrame([
            (6.0, Vectors.dense([1.0, 1.0])),
            (8.0, Vectors.dense([1.0, 2.0])),
            (9.0, Vectors.dense([2.0, 2.0])),
            (11.0, Vectors.dense([2.0, 3.0]))
        ],
            ["label", "features"]
        )
        # Create model and train.
        service = structure_streaming_service.StructuredStreamingService()
        service.train(train_ds)

        # Predict and results.

        def extract_features(x):
            values = x.split(",")
            features_ = []
            for i in values[1:]:
                features_.append(float(i))
            features = Vectors.dense(features_)
            return features

        extract_features_udf = udf(extract_features, VectorUDT())

        def extract_label(x):
            values = x.split(",")
            label = float(values[0])
            return label

        extract_label_udf = udf(extract_label, FloatType())

        predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \
            .withColumn("features", extract_features_udf(col("value"))) \
            .withColumn("label", extract_label_udf(col("value")))

        service.predict(predict_ds).awaitTermination(3)

        # +-----+------------------+
        # |label|        prediction|
        # +-----+------------------+
        # |  1.0|15.966990887541273|
        # |  2.0|18.961384020443553|
        # +-----+------------------+

    def setUp(self):
        warnings.filterwarnings("ignore", category=ResourceWarning)
        warnings.filterwarnings("ignore", category=DeprecationWarning)

if __name__ == "__main__":
    unittest.main()
      
      





, Scala .

:







implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val source = MemoryStream[Record]
source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))
source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))
val predictDs = source.toDF()
service.predict(predictDs).awaitTermination(2000)
      
      





Scala (, , sql):







package aaa.abc.dd.spark_streaming_pr.cluster

import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.streaming.StreamingQuery

class StructuredStreamingService {
  var service: LinearRegression = _
  var model: LinearRegressionModel = _

  def train(ds: DataFrame): Unit = {
    service = new LinearRegression().setMaxIter(10).setRegParam(0.01)
    model = service.fit(ds)
  }

  def predict(ds: DataFrame): StreamingQuery = {

    val m = ds.sparkSession.sparkContext.broadcast(model)

    def transformFun(features: org.apache.spark.ml.linalg.Vector): Double = {
      m.value.predict(features)
    }

    val transform: org.apache.spark.ml.linalg.Vector => Double = transformFun

    val toUpperUdf = udf(transform)

    val predictionDs = ds.withColumn("prediction", toUpperUdf(ds("features")))

    predictionDs
      .writeStream
      .foreachBatch((r: DataFrame, i: Long) => {
        r.show()
        // scalastyle:off println
        println(s"$i")
        // scalastyle:on println
      })
      .start()
  }
}
      
      





:







package aaa.abc.dd.spark_streaming_pr.cluster

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.scalatest.{Matchers, Outcome, fixture}

class StructuredStreamingServiceSuite extends fixture.FunSuite with Matchers {
  test("run") { spark =>
    // Prepare data.
    val trainDs = spark.createDataFrame(Seq(
      (6.0, Vectors.dense(1.0, 1.0)),
      (8.0, Vectors.dense(1.0, 2.0)),
      (9.0, Vectors.dense(2.0, 2.0)),
      (11.0, Vectors.dense(2.0, 3.0))
    )).toDF("label", "features")
    // Create model and train.
    val service = new StructuredStreamingService()
    service.train(trainDs)
    // Predict and results.
    implicit val sqlCtx = spark.sqlContext
    import spark.implicits._
    val source = MemoryStream[Record]
    source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))
    source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))
    val predictDs = source.toDF()
    service.predict(predictDs).awaitTermination(2000)
    // +-----+---------+------------------+
    // |label| features|        prediction|
    // +-----+---------+------------------+
    // |  1.0|[3.0,5.0]|15.966990887541273|
    // |  2.0|[4.0,6.0]|18.961384020443553|
    // +-----+---------+------------------+
  }

  override protected def withFixture(test: OneArgTest): Outcome = {
    val spark = SparkSession.builder().master("local[2]").getOrCreate()

    try withFixture(test.toNoArgTest(spark))

    finally spark.stop()
  }

  override type FixtureParam = SparkSession

  case class Record(label: Double, features: org.apache.spark.ml.linalg.Vector)

}
      
      







, API. . kafka.







“DataFrame” .







Python .












All Articles