Multiherramienta de gestión de almacenamiento de datos: maleta Wheely + dbt

Durante más de dos años, la herramienta de creación de datos se ha utilizado activamente en Wheely para administrar su almacén de datos. Durante este tiempo, se ha acumulado una experiencia considerable, estamos en un camino espinoso de prueba y error a la perfección en Ingeniería Analítica .

, , dbt, , .

Wheely. dbt, , . .


( ) . - . , business stakeholders, . , . , , Keep it simple (KISS) Don’t repeat yourself (DRY).

. DWH , , ( ).

Esquema de capa de almacenamiento de datos

sources. , ELT-. 1:1 , - . flatten (JSON) . 

staging : , , case. , .

Intermediate , . - , 5-10 . 

data marts , Data Scientists / Business Users / BI tools. , , :

  • dimensions: , , , ,

  • facts: , , , ,

  • looker: , BI-

120  :

Running with dbt=0.19.0
Found 273 models, 493 tests, 6 snapshots, 4 analyses, 532 macros, 7 operations, 8 seed files, 81 sources, 0 exposures


  • 273

  • 493 , not null, unique, foreign key, accepted values

  • 6 SCD (slowly changing dimensions)

  • 532 ( )

  • 7 operations vacuum + analyze

  • 81

, -. , Marketing / Supply / Growth / B2B. , late arriving data /.

, . Marketing :

dbt run -m +tag:marketing

. . :



| |____webhook

| |____receipt_prod

| |____core

| |____wheely_prod

| |____flights_prod

| |____online_hours_prod

| |____external

| |____financial_service


| |____looker

| |____dim

| |____snapshots

| |____facts


| |____webhook

| |____receipt_prod

| |____wheely_prod

| |____communication_prod




| |____dq

| | |____marts

| | |____external


- . , . Wheely Amazon Redshift.

, . . – journeys ().

Los viajes muestran la cadena de dependencia (viajes)

(join performance), , sources. - sort merge join:

– sort merge join

: city, country, completed timestamp, service group. Interleaved key I/O BI-.

– interleaved sortkey
               , "city"
               , "country"
               , "service_group"
               , "is_airport"
               , "is_wheely_journey"]

views ( ), . , staging, , :

           +materialized: view
           +schema: staging
           +tags: ["staging"]

– . – ephemeral, .. , . . , .

. , , , . (delta) – , . where:


with fines as


       , city_id
       , amount
       , details
       , metadata_timestamp
       , created_ts_utc
       , updated_ts_utc
       , created_dt_utc 

   from {{ ref('stg_fines') }}
   where true
   -- filter fines arrived since last processed time
   {% if is_incremental() -%}
       and metadata_timestamp > (select max(metadata_timestamp) from {{ this }})
   {%- endif %} 



, MPP , Data Engineer Data Warehouse Analyst ( !).

SQL + Jinja = Flexibility

SQL , Jinja .

, dbt compile & run. . CREATE : clustered by / distributed by / sorted by. :

Model code:

with details as (

           except=["fine_amount", "metadata_timestamp", "generated_number"]
   from {{ ref('fine_details_flatten') }}
   where fine_amount > 0


select * from details

Compiled code:
with details as (  


   from "wheely"."dbt_test_akozyr"."fine_details_flatten"
   where fine_amount > 0

select * from details

Run code:
create  table
   diststyle key distkey (fine_id)   
   compound sortkey(created_ts_utc)
 as (    

with details as (  



   from "wheely"."dbt_test_akozyr"."fine_details_flatten"
   where fine_amount > 0


select * from details


, , dbt. boilerplate code . .

-, , ? , , {{ ref('fine_details_flatten') }}

– . . .

Jinja Wheely dev / test / prod. . . , 3- . :

{% macro generate_schema_name_for_env(custom_schema_name, node) -%}
   {%- set default_schema = target.schema -%}

   {%- if == 'prod' and custom_schema_name is not none -%} 

       {{ custom_schema_name | trim }} 

   {%- else -%}

       {{ default_schema }} 

   {%- endif -%} 

{%- endmacro %}

. , : , , , (-, ).

, , copy-paste . Wheely Do not repeat yourself - . .

-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
     ( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
   , ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
   , ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
   , ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
   , ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd 

{%- endmacro %}



       -- price_details
       , r.currency
       , {{ convert_currency('price', 'currency') }}
       , {{ convert_currency('transfer_min_price', 'currency') }}
       , {{ convert_currency('discount', 'currency') }}
       , {{ convert_currency('insurance', 'currency') }}
       , {{ convert_currency('tips', 'currency') }}
       , {{ convert_currency('parking', 'currency') }}
       , {{ convert_currency('toll_road', 'currency') }}
       , {{ convert_currency('pickup_charge', 'currency') }}
       , {{ convert_currency('cancel_fee', 'currency') }}
       , {{ convert_currency('net_bookings', 'currency') }}
       , {{ convert_currency('gross_revenue', 'currency') }}
       , {{ convert_currency('service_charge', 'currency') }}     


   from {{ ref('requests_joined') }} r   

, , Jinja. SQL-. - :

-- compare two columns
{% macro dq_compare_columns(src_column, trg_column, is_numeric=false) -%}
   {%- if is_numeric == true -%}
       {%- set src_column = 'round(' + src_column + ', 2)' -%}       
       {%- set trg_column = 'round(' + trg_column + ', 2)' -%}
   {%- endif -%} 

           WHEN {{ src_column }} = {{ trg_column }} THEN 'match'
           WHEN {{ src_column }} IS NULL AND {{ trg_column }} IS NULL THEN 'both null'
           WHEN {{ src_column }} IS NULL THEN 'missing in source'
           WHEN {{ trg_column }} IS NULL THEN 'missing in target'
           WHEN {{ src_column }} <> {{ trg_column }} THEN 'mismatch'
           ELSE 'unknown'

{%- endmacro %}


-- cast epoch as human-readable timestamp
{% macro create_udf() -%}

{% set sql %}

       CREATE OR REPLACE FUNCTION {{ target.schema }}.f_bitwise_to_delimited(bitwise_column BIGINT, bits_in_column INT)
           RETURNS VARCHAR(512)
       AS $$
       # Convert column to binary, strip "0b" prefix, pad out with zeroes
       if bitwise_column is not None
           b = bin(bitwise_column)[2:].zfill(bits_in_column)[:bits_in_column+1]
           return b
       $$ LANGUAGE plpythonu

       CREATE OR REPLACE FUNCTION {{ target.schema }}.f_decode_access_flags(access_flags INT, deleted_at TIMESTAMP)
           RETURNS VARCHAR(128)
       AS $$
           SELECT nvl(
                         DECODE($2, null, null, 'deleted')
                       , DECODE(LEN(analytics.f_bitwise_to_delimited($1, 7))::INT, 7, null, 'unknown')
                       , DECODE(analytics.f_bitwise_to_delimited($1, 7)::INT, 0, 'active', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 1, 1), 1, 'end_of_life', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 7, 1), 1, 'pending', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 6, 1), 1, 'rejected', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 5, 1), 1, 'blocked', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 4, 1), 1, 'expired_docs', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 3, 1), 1, 'partner_blocked', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 2, 1), 1, 'new_partner', null
       $$ LANGUAGE SQL

 {% endset %}

 {% set table = run_query(sql) %}

{%- endmacro %}

, nested structures ( ) (external tables) S3 parquet. .

package - , , , . dbt hub , , , .

2 hooks dbt , , . - ( ):

   +pre-hook: "{{ logging.log_model_start_event() }}"
   +post-hook: "{{ logging.log_model_end_event() }}"

Supervisión de la implementación de modelos dbt en un clúster de Redshift
dbt Redshift

, :

{{ dbt_date.get_date_dimension('2012-01-01', '2025-12-31') }}

Dimensión de calendario generada por macros

dbt_external_tables Lakehouse, , S3. , , API Open Exchange Rates JSON:

External data stored in S3 accessed vith Redshift Spectrum
 - name: external
     schema: spectrum
     tags: ["spectrum"]
     description: "External data stored in S3 accessed vith Redshift Spectrum"
       - name: currencies_oxr
         description: "Currency Exchange Rates fetched from OXR API"
           error_after: {count: 15, period: hour}
         loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
           location: "s3://"
           row_format: "serde ''"
           - name: timestamp
             data_type: bigint
           - name: base
             data_type: varchar(3)
           - name: rates
             data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>

, , VACUUM + ANALYZE, Redshift PostgreSQL. , , . , dba .

dbt run-operation redshift_maintenance --args '{include_schemas: ["staging", "flatten", "intermediate", "analytics", "meta", "snapshots", "ad_hoc"]}'


Running in production: dbt Cloud Wheely

dbt Cloud , dbt. , , , IDE ( !) .

: , , :

-, . , cron-, webhook. , - (kicked off from Airflow):

, . Slack Production-. .

dbt , dbt Cloud , . : Airflow, Prefect, Dagster, cron. Github Actions. .

Wheely , , . onboarding.

. :

, . - Technology Enthusiast –

, , , dbt !

