El repositorio corporativo de Betting League se creó mucho antes de la introducción de Amplitude . Es utilizado principalmente por analistas e investigadores. Los productos y los especialistas en marketing recurrieron a los analistas para obtener análisis del almacén porque requiere habilidades de programación.
DWH Facts siempre ha carecido de una visión digital y de abarrotes en los productos que espiarían a los clientes y nos darían una idea de sus caminos. Con la llegada de Amplitude a la empresa, comenzamos a comprender el valor de los datos acumulados en el sistema y es muy bueno usarlo en Amplitude en sí, pero la simbiosis de los dos sistemas DWH y Amplitude no dio descanso. Por supuesto, implementamos la mecánica de transferencia de datos de Amplitude para el análisis interno en un almacén corporativo e hicimos instrucciones para configurar la transferencia de datos de Amplitude a DWH. También te invitamos al webinar Betting League y Adventum sobre el análisis y optimización de conversiones en el producto .
Cómo puede ayudar la agregación de datos de DWH
1. . DWH, .
2. . .
3. . , API . .
Amplitude DWH
Amplitude API . . . , , , . . , , UTC — , .
. Python, SQL . ! Amplitude , .
, — Amplitude . , CSV, ETL .
ETL — Extract, Transform, Load. , , DWH .
. , . , , .
Python 3.7 . , flow- (, , dag), , Windows. .bat ( ). , .
1.
#
import os
import requests
import pandas as pd
import zipfile
import gzip
import time
from tqdm import tqdm
import pymssql
import datetime
import pyodbc
from urllib.parse import quote_plus
from sqlalchemy import create_engine, event
2.
, , . , .
#
os.chdir("C:\Agents\AMPL\TEMP") #
dts1 = time.time() #
a = time.time() #
now = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") #
3. API Amplitude
, (Settings => Project = > General).
# API
api_key = ''
secret_key = ' '
4. ()
, , . SQL , . yyyymmddThh (. . ). API , .
# DWH (SQL)
server = " "
user = ""
password = ""
#
conn = pymssql.connect(server, user, password, " ")
cursor = conn.cursor()
cursor.execute(" . select")
5.
API Amplitude. .
#
for row in cursor:
dt = row[0]
conn.close()
6.
, . , , , , .
# ,
filename = 'AMPL_PROD_'+ dt + '_' + now
# , \\ WIN
# , os.chdir
working_dir = os.getcwd() + '\\'
7. SQL
SQL. , .
# DWH (SQL). ,
server = ' '
database = ' '
schema = ' '
table_to_export = ' '
# DWH (SQL)
params = 'DRIVER= {SQL Server};SERVER='+server+';DATABASE='+database+';User='+user+';Password='+password+';'
quoted = quote_plus(params)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
8. Amplitude
Amplitude , , json .
# API , json
class GetFile():
def __init__(self, now, dt, api_key, secret_key, filename, working_dir):
self.now = now
self.dt = dt
self.api_key = api_key
self.secret_key = secret_key
self.filename = filename
self.working_dir = working_dir
def response(self):
"""
"""
print(' !', end='\n')
count = 0
while True:
count += 1
print(f' {count}.....', end='')
try:
response = requests.get('https://amplitude.com/api/2/export?start='+self.dt+'&end='+self.dt,
auth=(self.api_key, self.secret_key),
timeout=10)
print('', end='\n')
print('1. ', end='\n')
break
except:
print('', end='\n')
time.sleep(1)
return response
def download(self):
'''
'''
with open(working_dir + self.filename + '.zip', "wb") as code:
file = self.response()
print('2. .....', end='')
code.write(file.content)
print('OK', end='\n')
def extraction(self):
'''
'''
z = zipfile.ZipFile(self.working_dir + self.filename + '.zip', 'r')
z.extractall(path=self.working_dir + self.filename)
print('3. ' + self.filename)
def getfilename(self):
'''
'''
return ': {} \n : {}'.format(self.filename, self.working_dir + self.filename + '.zip')
def unzip_and_create_df(working_dir, filename):
'''
JSON.gz json ( )
, .
'''
directory = working_dir + filename + '\\274440'
files = os.listdir(directory)
df = pd.DataFrame()
print(' :')
time.sleep(1)
for i in tqdm(files):
with gzip.open(directory + '\\' + i) as f:
add = pd.read_json(f, lines=True)
df = pd.concat([df, add], axis=0)
time.sleep(1)
print('4. JSON dataframe')
return df
#
file = GetFile(now, dt, api_key, secret_key, filename, working_dir)
# ( )
file.download()
# gz-
file.extraction()
# DataFrame json.gz
adf = unzip_and_create_df(working_dir, filename)
9. ()
, . . , SQL. .
#
print('5. , , , .....', end='')
# DWH
# -
sql_query_columns = ("""
' '
""")
settdf = pd.read_sql_query(sql_query_columns, new_con)
# lower() (= ) SAVE_COLUMN_NAME dwh
# , lower()
settdf['SAVE_COLUMN_NAME'] = settdf['SAVE_COLUMN_NAME'].apply(lambda x: x.lower())
adf.columns = [''.join(j.title() for j in i.split('_')).lower() for i in adf.columns]
#
needed_columns = [i for i in settdf['SAVE_COLUMN_NAME'].to_list()]
#
needed_columns.append('DOWNLOAD_FILE_NAME')
# DF c
adf['DOWNLOAD_FILE_NAME'] = filename
# ( , , )
adf.reset_index(inplace=True)
# ( ) ,
adf = adf.astype('unicode_').where(pd.notnull(adf), None)
# DataFrame
df_to_sql = adf[needed_columns]
#
print('OK', end='\n')
10.
. .
# DWH
#
dts2 = time.time()
print('6. ...', end='')
# DWH
connection = pyodbc.connect(params)
engine = create_engine(new_con)
# () DWH ( - )
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
if executemany:
cursor.fast_executemany = True
# None RAM
df_to_sql.to_sql(table_to_export, engine, schema=schema, if_exists='append', chunksize=100000, index=False)
#
connection.close()
print('OK', end='\n')
dtf = time.time()
diff1, diff2 = [str(int((dtf - i) // 60)) + ' ' + str(int((dtf - i) % 60)) + ' ' for i in (dts1, dts2)]
print(f' : {diff1}, : {diff2}')
print(' , ')
11.
! . .
#
#
conn2 = pymssql.connect(server, user, password, " ")
cursor2 = conn2.cursor()
query = " , ")
#
cursor2.execute(query)
#
conn2.commit()
conn2.close()
12.
, . . , ? , ETL , .
print(' ')
# ETL
conn3 = pymssql.connect(server, user, password, " ")
cursor3 = conn3.cursor()
query = " ETL . EXEC dbo.SP"
cursor3.execute(query)
conn3.commit()
conn3.close()
13.
, .
#
b = time.time()
diff = b-a
minutes = diff//60
print(' : {:.0f} ()'.format( minutes))
. — , .
, ETL, . , - , .