Exploring dbt with Snowflake

  • Home
  • /
  • Exploring dbt with Snowflake
Exploring dbt with Snowflake

Exploring dbt with Snowflake

Dbt Snowflake 19 Jan 2021 Siva Nadesan
Table of Contents

Overview

In this article we will see how to use dbt with Snowflake. dbt (data build tool) does the T in ELT (Extract, Load, Transform) processes. It doesn’t extract or load data, but it is extremely good at transforming data that’s already loaded into your warehouse. Code used in this article can be found here. Download the folder to get started.

post thumb

Here we are using dbt seed to load raw and reference tables just for demo purposes. In real use cases, you should never use dbt seed to load your raw layer, though it can be used to load simple reference tables.

Prerequisite

  • Download and install docker for your platform. Click here for instructions.
  • Create Snowflake account for the demo. Click here for instructions.

Install dbt

I started this exploration by installing dbt manually on a linux machine. The install had few issues along the way due to python version and other old modules in the machine. Due to which I ended up using docker for consistency. I am listing the steps for both Linux and Docker, however I would recommend to use docker if possible.

In Linux

  • Install Python 3.8, As of this post dbt does not work with Python 3.9
sudo apt-get install python3.8 python3.8-dev python3.8-distutils python3.8-venv
  • If you have multiple versions of python, then you can run below commands to set the priority of each version and to select a version.
# Set Priority
sudo update-alternatives --install /usr/bin/python python /usr/bin/python3.8 1
sudo update-alternatives --install /usr/bin/python python /usr/bin/python2.7 2

# Select Python version
sudo update-alternatives --config python
  • Install pip
sudo apt-get install python3-pip
  • Validate python version
python --version
  • Install dbt
sudo pip install dbt
  • Validate dbt
dbt --version
post thumb

In Docker

  • To use dbt in docker download the code from here

  • docker-compose.yml expects two volumes as shown below. One for profile.yml and one for dbt project files. So to get started,

    ⏩ create a copy of .dbt/profiles.yml.template as .dbt/profiles.yml. We will make changes to this file at a later stage.
    ⏩ create a copy of .env.template as .env in both root directory and docker/dbt directory. No changes required for now.

      volumes:
        - ./.dbt/profiles.yml:/secure/profile/profiles.yml
        - ./dbt:/data/dbt
    
  • docker-compose.yml will allow us to start and stop the containers using docker-compose up and docker-compose down commands. Start the services by running

    docker-compose up --remove-orphans -d --build
    
  • Validate the status of docker containers

    docker-compose ps
    
  • ssh into the container

    docker exec -it dbt /bin/bash
    
  • Validate dbt inside container

    dbt --version
    
    post thumb

Configure Snowflake for dbt account

Login into Snowflake account and do the following steps to setup dbt user, roles and grants.

  • Create warehouse

    USE ROLE ACCOUNTADMIN;
    
    CREATE OR REPLACE WAREHOUSE DBT_WH
    WITH 
    WAREHOUSE_SIZE = 'XSMALL' 
    WAREHOUSE_TYPE = 'STANDARD' 
    AUTO_SUSPEND = 60 
    AUTO_RESUME = TRUE 
    MIN_CLUSTER_COUNT = 1 
    MAX_CLUSTER_COUNT = 2 
    SCALING_POLICY = 'ECONOMY' 
    COMMENT = 'Warehouse for DBT';
    
  • Create database for each layer/model. These layers can be schema OR databases based on STANDARDS which you follow. Click here for more details about the dbt structure best practices

    Replace will delete old database with same name and creates a new database. So be careful when running create database command with replace option

    CREATE OR REPLACE DATABASE DBT_DEMO_SOURCES;
    CREATE OR REPLACE DATABASE DBT_DEMO_STAGING;
    CREATE OR REPLACE DATABASE DBT_DEMO_MARTS;
    

    ✅ SOURCES—– —–> Database for raw incoming data from external sources
    ✅ STAGING———–> Database for heavy lift data processing. Aka prep area
    ✅ MARTS————-> Database for analytics layer to host dimensions and facts for a specific business area

  • Create required schema. Here we will create one schema for each sources in SOURCE and STAGING DB and one SCHEMA for each business area for the marts.

    ✅ SAKILA————> Schema for raw incoming data from Sakila Sample Database
    ✅ SAKILA————> Schema for cleansed data from Sakila Sample Database
    ✅ CORE————–> Schema to host all core dimensions and facts

    CREATE SCHEMA "DBT_DEMO_SOURCES"."SAKILA";
    CREATE SCHEMA "DBT_DEMO_STAGING"."SAKILA";
    CREATE SCHEMA "DBT_DEMO_MARTS"."CORE";
    
  • Create Role

    CREATE ROLE "DBT_ROLE";
    
  • Create User, Make sure to change the password to a complex one as per your needs

    CREATE USER DBT_USER PASSWORD = 'dbtuser' 
      MUST_CHANGE_PASSWORD = FALSE
      DEFAULT_WAREHOUSE = DBT_WH 
      DEFAULT_ROLE = DBT_ROLE;
    
  • Grant role to the user

    GRANT ROLE "DBT_ROLE" TO ROLE "SYSADMIN";
    GRANT ROLE "DBT_ROLE" TO USER "DBT_USER";
    
  • Grant warehouse usage access to role

    GRANT USAGE ON WAREHOUSE DBT_WH TO ROLE DBT_ROLE;
      
    GRANT CREATE SCHEMA ON DATABASE "DBT_DEMO_SOURCES" TO DBT_ROLE;
    GRANT CREATE SCHEMA ON DATABASE "DBT_DEMO_STAGING" TO DBT_ROLE;
    GRANT CREATE SCHEMA ON DATABASE "DBT_DEMO_MARTS" TO DBT_ROLE;
    
    GRANT ALL ON SCHEMA "DBT_DEMO_SOURCES"."SAKILA" TO DBT_ROLE;
    GRANT ALL ON SCHEMA "DBT_DEMO_STAGING"."SAKILA" TO DBT_ROLE;
    GRANT ALL ON SCHEMA "DBT_DEMO_MARTS"."CORE" TO DBT_ROLE;
    
    GRANT USAGE ON ALL SCHEMAS IN DATABASE "DBT_DEMO_SOURCES" TO DBT_ROLE;
    GRANT USAGE ON FUTURE SCHEMAS IN DATABASE "DBT_DEMO_SOURCES" TO DBT_ROLE;
    GRANT USAGE ON ALL SCHEMAS IN DATABASE "DBT_DEMO_STAGING" TO DBT_ROLE;
    GRANT USAGE ON FUTURE SCHEMAS IN DATABASE "DBT_DEMO_STAGING" TO DBT_ROLE;
    GRANT USAGE ON ALL SCHEMAS IN DATABASE "DBT_DEMO_MARTS" TO DBT_ROLE;
    GRANT USAGE ON FUTURE SCHEMAS IN DATABASE "DBT_DEMO_MARTS" TO DBT_ROLE;
    
    GRANT SELECT ON ALL TABLES IN DATABASE "DBT_DEMO_SOURCES" TO DBT_ROLE;
    GRANT SELECT ON FUTURE TABLES IN DATABASE "DBT_DEMO_SOURCES" TO DBT_ROLE;
    GRANT SELECT ON ALL TABLES IN DATABASE "DBT_DEMO_STAGING" TO DBT_ROLE;
    GRANT SELECT ON FUTURE TABLES IN DATABASE "DBT_DEMO_STAGING" TO DBT_ROLE;
    GRANT SELECT ON ALL TABLES IN DATABASE "DBT_DEMO_MARTS" TO DBT_ROLE;
    GRANT SELECT ON FUTURE TABLES IN DATABASE "DBT_DEMO_MARTS" TO DBT_ROLE;
    
    GRANT SELECT ON ALL VIEWS IN DATABASE "DBT_DEMO_SOURCES" TO DBT_ROLE;
    GRANT SELECT ON FUTURE VIEWS IN DATABASE "DBT_DEMO_SOURCES" TO DBT_ROLE;
    GRANT SELECT ON ALL VIEWS IN DATABASE "DBT_DEMO_STAGING" TO DBT_ROLE;
    GRANT SELECT ON FUTURE VIEWS IN DATABASE "DBT_DEMO_STAGING" TO DBT_ROLE;
    GRANT SELECT ON ALL VIEWS IN DATABASE "DBT_DEMO_MARTS" TO DBT_ROLE;
    GRANT SELECT ON FUTURE VIEWS IN DATABASE "DBT_DEMO_MARTS" TO DBT_ROLE;
    
    GRANT MANAGE GRANTS ON ACCOUNT TO ROLE DBT_ROLE;
    
  • Review the grants

    SHOW GRANTS ON SCHEMA "DBT_DEMO_SOURCES"."SAKILA";
    SHOW GRANTS ON SCHEMA "DBT_DEMO_STAGING"."SAKILA";
    SHOW GRANTS ON SCHEMA "DBT_DEMO_MARTS"."CORE";
    SHOW GRANTS TO USER DBT_USER;
    

Configure dbt to Snowflake connection

In this step we will try to connect dbt with Snowflake.

Configure profiles.yml

  • If you have installed dbt locally on linux machine, find the path of dbt config profiles.yml by running

    dbt debug --config-dir
    
  • If you are running dbt in docker, then profiles.yml is located at .dbt/profiles.yml

  • Edit the config and add Snowflake details. See here for more details. Here is the config for this demo, be sure to update the account for your account

    sakila_db:
      target: dev
      outputs:
        dev:
          type: snowflake
          account: <your-account-id>
      
          # User/password auth
          user: DBT_USER
          password: dbtuser
      
          role: DBT_ROLE
          database: DBT_DEMO_SOURCES
          warehouse: DBT_WH
          schema: DBT
          threads: 1
          client_session_keep_alive: False
    

Configure dbt_project.yml

  • If you have installed dbt locally on linux machine, Create a new dbt project by running the command.

    dbt init dbt
    

    Here the dbt after init is the name of project, So can be anything which is meaningful.

    To follow along this demo, delete all contents from the newly created dbt folder and copy the content from dbt folder of this repo.

  • If you are running dbt in docker, then dbt directory should already have all the contents from this demo.

    root@ce9789ac4c21:/data/dbt# cd /data/dbt
    root@ce9789ac4c21:/data/dbt# ls
    README.md  analysis  data  dbt_modules  dbt_project.yml  logs  macros  models  snapshots  target  tests
    
  • Edit the dbt_project.yml to connect to the profile which we just created. The value for profile should exactly match with the name in profiles.yml

    name: 'sakila_db'
    version: '0.1'
    profile: 'sakila_db'
    source-paths: ["models"]
    analysis-paths: ["analysis"]
    test-paths: ["tests"]
    data-paths: ["data"]
    macro-paths: ["macros"]
    snapshot-paths: ["snapshots"]
      
    target-path: "target"
    clean-targets:
        - "target"
        - "dbt_modules"
        - "logs"
      
    models:
      sakila_db:
          materialized: table
          staging:
            database: DBT_DEMO_STAGING
            schema: SAKILA
            materialized: table
          marts:
            database: DBT_DEMO_MARTS
            schema: CORE
            materialized: view
    

Validate dbt to Snowflake connection

Validate the dbt profile and connection by running debug command from the dbt project directory. You need to ssh(docker exec -it dbt /bin/bash) into container when running dbt on docker.

dbt debug
post thumb

Load SOURCES

  • We will use the mysql salika db schema as source. See here for the source data model details.

  • Exported the tables as csv files and placed them in dbt/data folder. We will use the dbt seed command to load the data into Snowflake.

  • Before we start the seed lets update the dbt_project.yml route the data to raw schema. It will by default load to the schema specified in profiles.yml

    seeds:
    sakila_db:
      database: DBT_DEMO_SOURCE
      schema: raw # all seeds in this project will use the mapping schema by default
      sources:
        schema: sakila # seeds in the `data/sakila/ subdirectory will use the sakila schema
      lookups:
        schema: lookups # seeds in the `data/lookups/ subdirectory will use the lookups schema
    
  • Load raw tables

    dbt seed
    
  • Load went fine, though data got loaded into DBT_RAW rather than RAW schema.

    post thumb
  • Even after specifying schema for seeds, dbt adds the default schema in profiles.yml as prefix. To change this behaviour we will override the code for dbt macro generate_schema_name with a custom macro of same name. Create a macro named generate_schema_name.sql and copy the below code. Read more about this here

    {% macro generate_schema_name(custom_schema_name, node) -%}
      {%- set default_schema = target.schema -%}
      {%- if custom_schema_name is none -%}
          {{ default_schema }}
      {%- else -%}
          {{ custom_schema_name | trim }}
      {%- endif -%}
    {%- endmacro %}
    
  • Seeding the data again will load them to the right schema.

    post thumb

Test SOURCES

  • Its safe to test the data in sources before loading data into staging and marts. We will create a file named src_<source>.yml (say src_sakila.yml) with the source definition, testing and documentation for source area.

  • Here is an example for test specification in src_sakila.yml

    version: 2
      
    sources:
      - name: sakila
        tables:
          - name: actor
            columns:
              - name: ACTOR_ID
                tests:
                  - dbt_expectations.expect_column_values_to_be_unique
    
  • Here are couple of examples for running testing sources

    # Install packages
    dbt deps
    # Test all sources
    dbt test --models source:*
    # Test specific source model
    dbt test --models source:lookups
    
    post thumb

Load STAGING

Our goal is to create rental fact and related dimension as shown below. Here we will load staging, aka PREP area to prepare data for dim and facts. In dbt world transformation is done using model. model is data transformation, expressed in a single SELECT statement. Read more about dbt models here

post thumb
  • Models are in .sql files in models directory of dbt project, subdirectories are supported with in models directory.

  • Since most of the RAW tables are loaded by Extract and Load tools outside dbt, you need to define them as sources before using them. We have already defined the sources as part of testing in src_sakila.yml and src_lookups.yml

  • Here is an example model .sql file for a prep object. We could either use ref or source function since we seeded the data, but to stay close to a real use case, we will use source function.

    with source as (
        -- select * from {{ ref('actor') }}
        select * from {{ source('raw', 'actor') }}
    ),
    
    renamed as (
        select
            *
        from source 
    )
      
    select * from renamed
    
  • After we are ready with our models, we can execute them by running

    # Run all models 
    dbt run
    # Run specific models by model name
    dbt run --models stg_customer
    # Run specific models by directory name
    dbt run --models staging.*
    

    Best practice is to create sub directories if we want to group certain runs together

  • Validate the data in staging objects

    SELECT * FROM "DBT_DEMO_STAGING"."SAKILA"."STG_CUSTOMER";
    SELECT * FROM "DBT_DEMO_STAGING"."SAKILA"."STG_FILM";
    SELECT * FROM "DBT_DEMO_STAGING"."SAKILA"."STG_STAFF";
    SELECT * FROM "DBT_DEMO_STAGING"."SAKILA"."STG_STORE";
    
    post thumb
  • This is looking all good. dbt run loads all record during each run, loading all records during each run would be a bad idea when processing huge volume of data. To solve that problem we will convert our models to incremental models by adding below

    • configuration block on top of sql file for materialized and unique_key (Optional). Adding materialized will also convert the view to table.

      {{
          config(
              materialized='incremental',
              unique_key='customer_id'
          )
      }}
      
    • WHERE clause to filter the rows for incremental run

      {% if is_incremental() %}  
        where scus.last_update > (select max(last_update) from {{ this }})
      {% endif %}
      
  • Execute dbt run again and now we can see customer table getting loaded in incremental mode.

    post thumb

Load MARTS

The next step is to load the marts and here we will see few options to load the marts. Just for the ease of understanding we will create the customer dimension in few several ways(with different names).

Views

  • Creating the model as view is the simplest option, Here we will have to specify the materialized property of config as view.

    {{
        config(
            materialized='view'
        )
    }}
    
    SELECT *
    FROM {{ ref('stg_customer') }} scus
    
  • Create the model by running

    dbt run -m dim_customer_view
    
  • Validate the definition in snowflake

    select get_ddl('view', 'DBT_DEMO_MARTS.CORE.DIM_CUSTOMER_VIEW');
    
  • As you see this just created a simple view on top of our staging layer

    create or replace  view DIM_CUSTOMER_VIEW  as (
      
    SELECT *
    FROM DBT_DEMO_STAGING.SAKILA.stg_customer scus
      
      );
    
  • Validate the data in Snowflake

    -- We should see 599 records
    SELECT COUNT(*) as no_of_rec FROM "DBT_DEMO_MARTS"."CORE"."DIM_CUSTOMER_VIEW";
    SELECT * FROM "DBT_DEMO_MARTS"."CORE"."DIM_CUSTOMER_VIEW";
    

Incremental Table (SCD TYPE I)

  • Incremental model limits the amount of data thus reducing the runtime of transformations.

  • Specify the materialized property of config as incremental and these models are built as tables

    {{
        config(
            materialized='incremental',
            incremental_strategy='delete+insert',
            unique_key='customer_id'
        )
    }}
      
    SELECT *
    FROM {{ ref('stg_customer') }} scus
      
    {% if is_incremental() %}
      HAVING scus.last_update > (select max(last_update) from {{ this }})
    {% endif %}
    
  • Create the model by running (-d is to run in debug mode to see the full execution logs in CLI)

    dbt -d run -m dim_customer_incremental
    
  • You should see following steps in the dbt run log
    🚧 Create TEMPORARY table with HAVING clause to filter records based incremental condition
    🚧 Delete from the TARGET table, since we have incremental_strategy as delete+insert
    🚧 Insert into the TARGET table by selecting from the TEMPORARY table

  • Validate the definition in snowflake, output will be a create table statement

    select get_ddl('table', 'DBT_DEMO_MARTS.CORE.DIM_CUSTOMER_INCREMENTAL');
    
  • Validate the data in Snowflake

    -- We should see 599 records
    SELECT COUNT(*) as no_of_rec FROM "DBT_DEMO_MARTS"."CORE"."DIM_CUSTOMER_INCREMENTAL";
    SELECT * FROM "DBT_DEMO_MARTS"."CORE"."DIM_CUSTOMER_INCREMENTAL";
    
     -- Check the current max record in staging area and marts
    select 'stg_customer' as table_name, max(last_update) as last_update from DBT_DEMO_STAGING.SAKILA.stg_customer
    union 
    select 'dim_customer_view' as table_name, max(last_update) as last_update from DBT_DEMO_MARTS.CORE.dim_customer_view
    union 
    select 'dim_customer_incremental' as table_name, max(last_update) as last_update from DBT_DEMO_MARTS.CORE.dim_customer_incremental
    
    -- Check the record before update
    SELECT * FROM DBT_DEMO_STAGING.SAKILA.stg_customer where CUSTOMER_ID = 1;
    SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_view where CUSTOMER_ID = 1;
    SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_incremental where CUSTOMER_ID = 1;
    
  • Make some change in stage

    -- Update the staging table
    UPDATE DBT_DEMO_STAGING.SAKILA.stg_customer
    SET CUSTOMER_DISTRICT = 'Kansas', LAST_UPDATE = current_timestamp(2)
    WHERE CUSTOMER_ID = 1
    
    -- Check the record after update, the view should have the updated information
    SELECT * FROM DBT_DEMO_STAGING.SAKILA.stg_customer where CUSTOMER_ID = 1;
    SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_view where CUSTOMER_ID = 1;
    SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_incremental where CUSTOMER_ID = 1;
    
  • Re-run the model

    dbt -d run -m dim_customer_incremental
    
  • Validate the data

    -- Check the record after update, the stage, view and incremental table should have the updated information now
    SELECT * FROM DBT_DEMO_STAGING.SAKILA.stg_customer where CUSTOMER_ID = 1;
    SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_view where CUSTOMER_ID = 1;
    SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_incremental where CUSTOMER_ID = 1;
    
  • So this is an type-1(overwrite) Slowly Changing Dimension implementation

Snapshot Table (SCD TYPE II)

  • Snapshots are used to record changes to a mutable table over time. Snapshots are type-2(add new row) Slowly Changing Dimension

  • Code for snapshots should be placed inside snapshots directory in the dbt project

  • snapshots needs target_database and target_schema which can be specified in the configuration block of sql OR inside dbt_project.yml. In this example, updated dbt_project.yml with below block.

  • snapshots:
      sakila_db:
          marts:
            target_database: DBT_DEMO_MARTS
            target_schema: CORE
    
  • Snapshot sql(say snap_customer.sql) files are select statement inside snapshot block, config block for snapshots requires the following
    unique_key
    strategy can be timestamp OR check. updated_at is required for timestamp and check_cols is required for check

    Click here to read more about this

    {% snapshot snap_customer %}
      
    {{
        config(
          unique_key='customer_id',
          strategy='timestamp',
          updated_at='last_update',
          invalidate_hard_deletes=True,
        )
    }}
      
    select * from {{ ref('dim_customer_view') }}
      
    {% endsnapshot %}
    
  • Create the snapshot by running

    dbt snapshot -s snap_customer
    
  • Validate the definition in snowflake, output will be a create TRANSIENT table statement. In addition to the data attributes, the table definition will also have following dbt audit attributes

    Attribute Name Description
    DBT_SCD_ID A unique key generated for each snapshotted record
    DBT_UPDATED_AT The updated_at timestamp of the source record when this snapshot row was inserted
    DBT_VALID_FROM The timestamp when this snapshot row was first inserted
    DBT_VALID_TO The timestamp when this row row became invalidated
    select get_ddl('table', 'DBT_DEMO_MARTS.CORE.SNAP_CUSTOMER');
    
  • Validate the data in Snowflake

    -- We should see 599 records
    SELECT COUNT(*) as no_of_rec FROM "DBT_DEMO_MARTS"."CORE"."SNAP_CUSTOMER";
    SELECT * FROM "DBT_DEMO_MARTS"."CORE"."SNAP_CUSTOMER";
    
     -- Check the current max record in staging area and marts
    select 'stg_customer' as table_name, max(last_update) as last_update from DBT_DEMO_STAGING.SAKILA.stg_customer
    union 
    select 'dim_customer_view' as table_name, max(last_update) as last_update from DBT_DEMO_MARTS.CORE.dim_customer_view
    union
    select 'snap_customer' as table_name, max(last_update) as last_update from DBT_DEMO_MARTS.CORE.snap_customer
    
    -- Check the record before update
    SELECT * FROM DBT_DEMO_STAGING.SAKILA.stg_customer where CUSTOMER_ID = 1;
    SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_view where CUSTOMER_ID = 1;
    SELECT * FROM DBT_DEMO_MARTS.CORE.snap_customer where CUSTOMER_ID = 1;
    
  • Make some change in stage

    -- Update the staging table
    UPDATE DBT_DEMO_STAGING.SAKILA.stg_customer
    SET CUSTOMER_DISTRICT = 'Kansas', LAST_UPDATE = current_timestamp(2)
    WHERE CUSTOMER_ID = 1
    
    -- Check the record after update, the view should have the updated information
    SELECT * FROM DBT_DEMO_STAGING.SAKILA.stg_customer where CUSTOMER_ID = 1;
    SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_view where CUSTOMER_ID = 1;
    SELECT * FROM DBT_DEMO_MARTS.CORE.snap_customer where CUSTOMER_ID = 1;
    
  • Re-run the snapshot

dbt snapshot -s snap_customer
  • Validate the data, we should see two records for the record which we updated

    -- Stage, view and snapshot table should have the updated information now
    SELECT * FROM DBT_DEMO_STAGING.SAKILA.stg_customer where CUSTOMER_ID = 1;
    SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_view where CUSTOMER_ID = 1;
    SELECT * FROM DBT_DEMO_MARTS.CORE.snap_customer where CUSTOMER_ID = 1;
    
    post thumb

Custom Materializations (Support for DDL)

Materializations are responsible for taking a dbt model sql statement and turning it into a transformed dataset in a database. dbt does not support data description language (DDL), but can be made possible with custom materializations. Lets see one such custom materializations, Persistent Tables materialization. This materialization supports DDL, non destructive column add (CREATE OR REPLACE will drop the existing table, so Persistent Tables materialization works by creating a copy table and then copying the data back over after the changes)

  • Custom macros’s should be placed inside macros directory, its good to place them inside a sub folder for specific function

    post thumb
  • Create sql file with DDL

    {{
        config(materialized='persistent_table'
            ,retain_previous_version_flg=false
            ,migrate_data_over_flg=true
        )
    }}
      
    CREATE OR REPLACE TABLE "{{ database }}"."{{ schema }}"."dim_customer_materialization" (
        CUSTOMER_ID NUMBER(38,0),
      CUSTOMER_FIRST_NAME VARCHAR(100),
      CUSTOMER_LAST_NAME VARCHAR(100),
      CUSTOMER_EMAIL VARCHAR(100)
    )
    
  • Create model by running

    dbt -d run -m dim_customer_materialization --full-refresh
    
  • Validate the definition in snowflake, The definition should match the DDL statements in the model file

    select get_ddl('table', 'DBT_DEMO_MARTS.CORE.SNAP_CUSTOMER');
    

Custom Materializations (Support for backfills and DDL’s)

If we want to add a new column OR drop an column from an existing table, its not possible with incremental materialization unless we do a full refresh. Not all tables can afford a full refresh for several reasons

⏩ Cost associated with full refresh based backfill
⏩ Source tables no longer has all the historical data required to do a full refresh based backfill

To solve this problem we will use vault_insert_by_period materialisation. I have customized a bit further to add support for target database parameter and few other cosmetic logic changes. Click here for the links to original code and the customized code

TO DO : Rename the materialisation since its a customized version

  • Since all of the source records which we have used in this demo has the same last updated date, to have some data for backfills scenario we will run below sql

    UPDATE DBT_DEMO_STAGING.SAKILA.stg_customer
    SET CUSTOMER_DISTRICT = 'Kansas', LAST_UPDATE = current_timestamp(2)
    WHERE CUSTOMER_ID = 1;
    
  • Create a new model sql file, say dim_customer_insert_by_period.sql with below code, This materialisation requires

    period granularity of the load (hour, day, month, year)
    timestamp_field column name for time stamp field which should be used for filtering data
    start_date dates to control the load range
    stop_date dates to control the load range
    {{
     config(
       materialized = "vault_insert_by_period",
       period = "year",
       timestamp_field = "last_update",
       start_date = "2006-01-01",
       stop_date = "2021-01-01", 
     )
    }}
     
    WITH stage
    AS (
     SELECT CUSTOMER_ID,
         CUSTOMER_FIRST_NAME,
         CUSTOMER_LAST_NAME,
         CUSTOMER_EMAIL,
         LAST_UPDATE
     FROM {{ ref('stg_customer') }}
     WHERE __PERIOD_FILTER__
     )
     
    SELECT *
    FROM stage
    
  • Run the model

    dbt -d run -m dim_customer_insert_by_period
    
  • Now, lets add a new column to the model by running below sql. Idea here is to manage the DDL’s outside dbt in a DCM(Database Change Management) tool like sqitch

    ALTER TABLE "DBT_DEMO_MARTS"."CORE"."DIM_CUSTOMER_INSERT_BY_PERIOD"
    ADD COLUMN CUSTOMER_DISTRICT VARCHAR DEFAULT NULL;
    
  • Validate the data in snowflake

    post thumb
  • Update the start_date and stop_date in dim_customer_insert_by_period.sql

        start_date = "2021-01-01",
        stop_date = "2021-04-01", 
    
  • Run the model

    dbt -d run -m dim_customer_insert_by_period
    
    • Validate the data in snowflake, As you could see this run just loaded the one record which for the period mentioned in start and stop date.
    post thumb

    Next step is to integrate the start and end date as a parameter from airflow and to add delete logic always before the load to have better handle on loads for a specific period.

Packages in dbt

A good process of software reuse leads to enhance the reliability, productivity, quality and the reduction of time and cost. In dbt, reusability comes in the form of packages. You can find all dbt packages in dbt hub

  • Create packages.yml at the same level as dbt_project.yml and update it with the required package details. The one I am more interested about during this exploration is dbt_expectations, this allows dbt users to deploy Great Expectations like data testing without needing additional integration with Great Expectations.

    packages:
      - package: fishtown-analytics/dbt_utils
        version: 0.6.4
      - package: calogica/dbt_expectations
        version: 0.2.2
      - package: fishtown-analytics/codegen
        version: 0.3.1
      - package: yu-iskw/dbt_airflow_macros
        version: 0.2.2
      - package: Datavault-UK/dbtvault
        version: 0.7.3
    
  • To install packages, run dbt deps

    post thumb

Tests in dbt

dbt has two type of tests, schema tests and data tests

  • Schema tests are configured as YAML inside the schema.yml file. Schema test returns 0 when successful.

  • Data test is a select statement inside tests directory. Data test returns 0 records when successful.

  • Here is a sample configuration file say stg_sakila.yml for doing schema test using the default dbt tests unique, not_null and dbt_expectations test expect_column_values_to_be_unique

     models:
       - name: customer
         columns:
           - name: customer_id
             tests:
               - unique
               - not_null
               - dbt_expectations.expect_column_values_to_be_unique
    
  • To execute tests, run dbt test post thumb

Currently the test runs after model creation and persisting the changes, there is a ticket open to do test after model creation but persist changes only if test passes. See more details about this here

Documents in dbt

  • dbt provides way to generate documentation and to publish them as a website.

  • You can add descriptions to models, columns, sources in the schema.yml. Here is an example

    models:
    - name: customer
      description: This table contains information about customers who rented the movie 
      columns:
        - name: customer_id
          description: This is a unique identifier for customer
    
  • dbt also supports docs block using the jinja docs tag, to add docs block
    ⏩ Update dbt_project.yml to add the following

    docs-paths: ["docs"]
    asset-paths: ["assets"]
    

    ⏩ Create .md file with required information inside docs directory and place the supporting images in assets directory. Example would be overview.md file with __overview__ block which overrides the default overview and creates custom overview page

    {% docs __overview__ %}
      
    # Exploring Dbt With Snowflake
    In this article we will see how to use dbt with Snowflake.
      
    ![Overview](../assets/explore-dbt.png)
      
    {% enddocs %}
    

    ⏩ Another example would be to film.md and update it with details about films table

    {% docs film_overview %}
      
    This table contains information about films.
      
    {% enddocs %}
    

    ⏩ Update schema.yml to add the doc() function to reference the docs name, which is film_overview in this example

    - name: film
      description: '{{ doc("film_overview") }}' 
    
  • You can generate documents by running dbt docs generate.

  • Generated documents can be published by running dbt docs serve --port 8001. This will publish the documents locally.

Its advisable to run dbt run before generating documents. So the order of execution would be

dbt deps
dbt seed
dbt run
dbt test
dbt docs generate
dbt docs serve

Read more about order of execution here

  • Its not mandatory to add entry for each column in schema.yml, but the documentation would be good as details in your schema.yml. So its recommended to add detailed information and to follow some standards for schema.yml.

  • Documents can be also hosted on s3 as a static site. Click here for documents generated from this demo. Here are the high level steps to host dbt docs in s3

    ⏩ Create s3 bucket
    ⏩ Update s3 bucket policy to allow read access

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "PublicReadGetObject",
                "Effect": "Allow",
                "Principal": "*",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::dbt.entechlog.com/*"
            }
        ]
    }
    

    ⏩ Enable Static website hosting
    ⏩ Upload the contents from target/ directory of dbt project to the s3 bucket.

    post thumb

Hope this was helpful. Did I miss something ? Let me know in the comments and I’ll add it in !

Notes

*** You can see list all containers by running docker container ls -a
*** You can bring down the containers by running docker-compose down
*** You can bring down the containers and related volumes by running docker-compose down --volumes
*** You can delete all exited containers by running docker rm $(docker ps -q -f status=exited)
*** You can delete all generated folders and clean the dbt files by running dbt clean

References



About The Authors
Siva Nadesan

Siva Nadesan is a Principal Data Engineer. His passion includes data and blogging about technologies. He is also the creator and maintainer of www.entechlog.com

LinkedIn

Share: