PySpark. Resolviendo el problema de encontrar sesiones

¡Buenas tardes queridos lectores! Hace unos días, releyendo el libro de Anthony Molinaro “SQL. Una colección de recetas ”, en uno de los capítulos encontré un tema que estaba dedicado a determinar el inicio y el final del rango de valores consecutivos. Después de leer el material brevemente, recordé de inmediato que ya me había encontrado con esta pregunta como una de las tareas de prueba, pero luego el tema se declaró como “La tarea de encontrar sesiones”. El truco de la entrevista técnica no fue una revisión del trabajo realizado, sino una de las preguntas del entrevistador sobre cómo obtener valores similares usando Spark. Al prepararme para la entrevista, no sabía que la empresa usa (o tal vez no ...) Apache Spark y, por lo tanto, no recopilé información sobre una nueva herramienta para mí en ese momento. Solo quedaba plantear la hipótesis de que la solución deseada podría ser como un guión,que se puede escribir usando la biblioteca Pandas. Aunque de manera muy remota, igual logré alcanzar el objetivo, pero no logré trabajar en esta organización.





Para ser justos, quiero señalar que a lo largo de los años he progresado poco en el aprendizaje de Apache Spark. Pero aún quiero compartir las mejores prácticas con los lectores, ya que muchos analistas no se han encontrado con esta herramienta y otros pueden tener una entrevista similar. Si eres un profesional de Spark, siempre puedes sugerir un código más óptimo en los comentarios de la publicación.





Este fue un preámbulo, pasemos directamente al análisis de este tema. Primero vayamos y escribamos un script SQL. Pero primero, creemos una base de datos y la llenemos de valores. Dado que este es un ejemplo de demostración, sugiero usar SQLite. Esta base de datos es inferior a los "colegas en el taller" más poderosos, pero sus capacidades para el desarrollo de scripts son suficientes para nosotros en su totalidad. Para automatizar las operaciones anteriores, escribí el siguiente código en Python.





#  
import sqlite3

#     
projects = [
    ('2020-01-01', '2020-01-02'),
    ('2020-01-02', '2020-01-03'),
    ('2020-01-03', '2020-01-04'),
    ('2020-01-04', '2020-01-05'),
    ('2020-01-06', '2020-01-07'),
    ('2020-01-16', '2020-01-17'),
    ('2020-01-17', '2020-01-18'),
    ('2020-01-18', '2020-01-19'),
    ('2020-01-19', '2020-01-20'),
    ('2020-01-21', '2020-01-22'),
    ('2020-01-26', '2020-01-27'),
    ('2020-01-27', '2020-01-28'),
    ('2020-01-28', '2020-01-29'),
    ('2020-01-29', '2020-01-30')
]

try:
    #  
    con = sqlite3.connect("projects.sqlite")
    #  
    cur = con.cursor()
    #  
    cur.execute("""CREATE TABLE IF NOT EXISTS projects (
                    proj_id INTEGER PRIMARY KEY AUTOINCREMENT,
                    proj_start TEXT,
                    proj_end TEXT)""")
    #  
    cur.executemany("INSERT INTO projects VALUES(NULL, ?,?)", projects)
    #  
    con.commit()
    #  
    cur.close()
except sqlite3.Error as err:
    print("  ", err)
finally:
    #  
    con.close()
    print("  ")

      
      



. DBeaver. , SQL .





, , , . , - . , . ().





select 
      p3.proj_group, 
      min(p3.proj_start) as date_start,
      max(p3.proj_end) as date_end,
      julianday(max(p3.proj_end))-julianday( min(p3.proj_end))+1 as delta
from
    (select 
	     p2.*,
	     sum(p2.flag)over(order by p2.proj_id) as proj_group
	from 
		(select 
		      p.proj_id , 
		      p.proj_start, 
		      p.proj_end, 
		      case 
		      when lag(p.proj_end)over(order by p.proj_id) = p.proj_start then 0 else 1 
		      end as flag
		from projects as p) as p2) as p3
group by p3.proj_group
      
      



, . . , : . , . , , lag. 0, 1. , . . , .  . , ( julianday SQLite). . Spark.





, Apache Spark         ,  Hadoop. Java, Scala R, Spark PySpark. . Google Colab, . - , . , .





Linux OpenJDK, Spark. . findspark. , .





SQLite , . , .





Spark , . , . -, , , -, . , “ Spark. ”, , , , .





, , SQL. : , ( datediff).





, . , - , , , SQL Spark. , , . .





from pyspark.sql.functions import lag
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Equivalent of Pandas.dataframe.shift() method
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df.withColumn('lag', F.lag("proj_end").over(w))
#...
# Equivalent of SQL- CASE WHEN...THEN...ELSE... END
df_dataframe = df_dataframe.withColumn('flag',F.when(df_dataframe["proj_start"] == df_dataframe["lag"],0).otherwise(1))
#...
# Cumsum by column flag
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df_dataframe.withColumn("proj_group", F.sum("flag").over(w))
#...
# Equivalent of SQL - GROUP BY
from pyspark.sql.functions import  min, max
df_group = df_dataframe.groupBy("proj_group").agg(min("proj_start").alias("date_start"), \
                                                  max("proj_end").alias("date_end"))
df_group = df_group.withColumn("delta", F.datediff(df_group.date_end,df_group.date_start))
df_group.show()
      
      



.





  1. , . . , “” , .





  2. Incluso si nunca antes ha trabajado con Spark, esta no es una razón para rechazar la competencia por un puesto vacante. Los conceptos básicos de PySpark se pueden dominar en poco tiempo, siempre que los antecedentes ya tengan experiencia en programación utilizando la biblioteca Pandas.





  3. No hay escasez de libros sobre Spark.





Eso es todo. ¡Toda salud, buena suerte y éxito profesional!








All Articles