¡Quiero saber todo sobre el cliente! O cómo enriquecer los datos secos de DWH con rutas digitales y propiedades del cliente de Amplitude

El repositorio corporativo de Betting League se creó mucho antes de la introducción de Amplitude . Es utilizado principalmente por analistas e investigadores. Los productos y los especialistas en marketing recurrieron a los analistas para obtener análisis del almacén porque requiere habilidades de programación.







DWH Facts siempre ha carecido de una visión digital y de abarrotes en los productos que espiarían a los clientes y nos darían una idea de sus caminos. Con la llegada de Amplitude a la empresa, comenzamos a comprender el valor de los datos acumulados en el sistema y es muy bueno usarlo en Amplitude en sí, pero la simbiosis de los dos sistemas DWH y Amplitude no dio descanso. Por supuesto, implementamos la mecánica de transferencia de datos de Amplitude para el análisis interno en un almacén corporativo e hicimos instrucciones para configurar la transferencia de datos de Amplitude a DWH. También te invitamos al webinar Betting League y Adventum sobre el análisis y optimización de conversiones en el producto .







imagen







Cómo puede ayudar la agregación de datos de DWH



1. . DWH, .







2. . .







3. . , API . .







Amplitude DWH



Amplitude API . . . , , , . . , , UTC — , .







. Python, SQL . ! Amplitude , .







, — Amplitude . , CSV, ETL .







ETL — Extract, Transform, Load. , , DWH .







. , . , , .







Python 3.7 . , flow- (, , dag), , Windows. .bat ( ). , .







1.



# 
import os
import requests
import pandas as pd
import zipfile
import gzip
import time
from tqdm import tqdm
import pymssql
import datetime
import pyodbc
from urllib.parse import quote_plus
from sqlalchemy import create_engine, event
      
      





2.



, , . , .







#   
os.chdir("C:\Agents\AMPL\TEMP") #    
dts1 = time.time() #       
a = time.time() #  
now = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") #    
      
      





3. API Amplitude



, (Settings => Project = > General).







#    API 
api_key = ''
secret_key = '  '
      
      





4. ()



, , . SQL , . yyyymmddThh (. . ). API , .







#     DWH (SQL)
server = " "
user = ""
password = ""

#     
conn = pymssql.connect(server, user, password, " ")
cursor = conn.cursor()
cursor.execute("   .    select")
      
      





5.



API Amplitude. .







#        
for row in cursor: 
    dt = row[0]
conn.close()   
      
      





6.



, . , , , , .







#   ,     
filename = 'AMPL_PROD_'+ dt + '_' + now

#  ,     \\  WIN
#      ,      os.chdir
working_dir = os.getcwd() + '\\'

      
      





7. SQL



SQL. , .







#    DWH (SQL). ,           
server = ' '
database = ' '
schema = ' '
table_to_export = ' '

#    DWH (SQL)
params = 'DRIVER= {SQL Server};SERVER='+server+';DATABASE='+database+';User='+user+';Password='+password+';'
quoted = quote_plus(params)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
      
      





8. Amplitude



Amplitude , , json .







#     API ,     json
class GetFile():

    def __init__(self, now, dt, api_key, secret_key, filename, working_dir):

        self.now = now
        self.dt = dt
        self.api_key = api_key
        self.secret_key = secret_key
        self.filename = filename
        self.working_dir = working_dir

    def response(self):
        """
           
        """
        print('   !', end='\n')
        count = 0
        while True:
            count += 1
            print(f' {count}.....', end='')
            try:
                response = requests.get('https://amplitude.com/api/2/export?start='+self.dt+'&end='+self.dt,
                                        auth=(self.api_key, self.secret_key),
                                        timeout=10)
                print('', end='\n') 
                print('1.    ', end='\n')
                break
            except:
                print('', end='\n')
                time.sleep(1)

        return response

    def download(self):
        '''
           
        '''
        with open(working_dir + self.filename + '.zip', "wb") as code:
            file = self.response()
            print('2.    .....', end='')           
            code.write(file.content)
        print('OK', end='\n')

    def extraction(self):
        '''
             
        '''
        z = zipfile.ZipFile(self.working_dir + self.filename + '.zip', 'r')
        z.extractall(path=self.working_dir + self.filename)
        print('3.         ' + self.filename)

    def getfilename(self):
        '''
            
        '''
        return ': {} \n : {}'.format(self.filename, self.working_dir + self.filename + '.zip')

def unzip_and_create_df(working_dir, filename):
        '''
         JSON.gz   json     (   )
         ,    .
        '''
        directory = working_dir + filename + '\\274440'
        files = os.listdir(directory)
        df = pd.DataFrame()
        print('  :')
        time.sleep(1)
        for i in tqdm(files):
            with gzip.open(directory + '\\' + i) as f:
                add = pd.read_json(f, lines=True)
            df = pd.concat([df, add], axis=0)
        time.sleep(1)    
        print('4. JSON         dataframe')
        return df

#    
file = GetFile(now, dt, api_key, secret_key, filename, working_dir)

#   (      )
file.download()

#  gz-   
file.extraction()

#   DataFrame    json.gz
adf = unzip_and_create_df(working_dir, filename)

      
      





9. ()



, . . , SQL. .







#    
print('5.    ,  , , .....', end='')

#   DWH        
#       -   
sql_query_columns = ("""
                        '             '
                    """)

settdf = pd.read_sql_query(sql_query_columns, new_con)

#   lower()  (= )   SAVE_COLUMN_NAME  dwh
#   , lower()       
settdf['SAVE_COLUMN_NAME'] = settdf['SAVE_COLUMN_NAME'].apply(lambda x: x.lower())
adf.columns = [''.join(j.title() for j in i.split('_')).lower() for i in adf.columns]

#   
needed_columns = [i for i in settdf['SAVE_COLUMN_NAME'].to_list()]

#     
needed_columns.append('DOWNLOAD_FILE_NAME')

#    DF c  
adf['DOWNLOAD_FILE_NAME'] = filename

#   ( , ,  )
adf.reset_index(inplace=True)

#    ( )   ,   
adf = adf.astype('unicode_').where(pd.notnull(adf), None)

#  DataFrame    
df_to_sql = adf[needed_columns]

#     
print('OK', end='\n')
      
      





10.



. .







#    DWH
#   
dts2 = time.time()
print('6.    ...', end='')

#      DWH
connection = pyodbc.connect(params)
engine = create_engine(new_con)

#   ()    DWH (   -  )
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

#  None  RAM
df_to_sql.to_sql(table_to_export, engine, schema=schema, if_exists='append', chunksize=100000, index=False)

#    
connection.close() 
print('OK', end='\n')

dtf = time.time()
diff1, diff2 = [str(int((dtf - i) // 60)) + '  ' + str(int((dtf - i) % 60)) + ' ' for i in (dts1, dts2)]
print(f' : {diff1},   : {diff2}')
print(' ,   ')

      
      





11.



! . .







#    
#     
conn2 = pymssql.connect(server, user, password, "  ")
cursor2 = conn2.cursor()
query = "      ,  ")

#  
cursor2.execute(query)

#    
conn2.commit()
conn2.close()

      
      





12.



, . . , ? , ETL , .







print('  ')

#       ETL       
conn3 = pymssql.connect(server, user, password, "  ")
cursor3 = conn3.cursor()
query = " ETL   .  EXEC dbo.SP"

cursor3.execute(query)

conn3.commit()
conn3.close()
      
      





13.



, .







#      
b = time.time()
diff = b-a
minutes = diff//60
print('  : {:.0f} ()'.format( minutes))

      
      





. — , .







, ETL, . , - , .







, Amplitude, . s2s , .







20 17:00 . . , .








All Articles