Overview
In this article we will see how to use dbt with Snowflake. dbt (data build tool) does the T in ELT (Extract, Load, Transform) processes. It doesn’t extract or load data, but it is extremely good at transforming data that’s already loaded into your warehouse. Code used in this article can be found here. Download the folder to get started.
Here we are using
dbt seed
to load raw and reference tables just for demo purposes. In real use cases, you should never usedbt seed
to load your raw layer, though it can be used to load simple reference tables.
Prerequisite
- Download and install docker for your platform. Click here for instructions.
- Create Snowflake account for the demo. Click here for instructions.
Install dbt
I started this exploration by installing dbt manually on a linux machine. The install had few issues along the way due to python version and other old modules in the machine. Due to which I ended up using docker for consistency. I am listing the steps for both Linux and Docker, however I would recommend to use docker if possible.
In Linux
- Install Python 3.8, As of this post dbt does not work with Python 3.9
sudo apt-get install python3.8 python3.8-dev python3.8-distutils python3.8-venv
- If you have multiple versions of python, then you can run below commands to set the priority of each version and to select a version.
# Set Priority
sudo update-alternatives --install /usr/bin/python python /usr/bin/python3.8 1
sudo update-alternatives --install /usr/bin/python python /usr/bin/python2.7 2
# Select Python version
sudo update-alternatives --config python
- Install pip
sudo apt-get install python3-pip
- Validate python version
python --version
- Install dbt
sudo pip install dbt
- Validate dbt
dbt --version
In Docker
-
To use dbt in docker download the code from here
-
docker-compose.yml expects two volumes as shown below. One for profile.yml and one for dbt project files. So to get started,
⏩ create a copy of
.dbt/profiles.yml.template
as.dbt/profiles.yml
. We will make changes to this file at a later stage.
⏩ create a copy of.env.template
as.env
in both root directory anddocker/dbt
directory. No changes required for now.volumes: - ./.dbt/profiles.yml:/secure/profile/profiles.yml - ./dbt:/data/dbt
-
docker-compose.yml will allow us to start and stop the containers using
docker-compose up
anddocker-compose down
commands. Start the services by runningdocker-compose up --remove-orphans -d --build
-
Validate the status of docker containers
docker-compose ps
-
ssh into the container
docker exec -it dbt /bin/bash
-
Validate dbt inside container
dbt --version
Configure Snowflake for dbt account
Login into Snowflake account and do the following steps to setup dbt user, roles and grants.
-
Create warehouse
USE ROLE ACCOUNTADMIN; CREATE OR REPLACE WAREHOUSE DBT_WH WITH WAREHOUSE_SIZE = 'XSMALL' WAREHOUSE_TYPE = 'STANDARD' AUTO_SUSPEND = 60 AUTO_RESUME = TRUE MIN_CLUSTER_COUNT = 1 MAX_CLUSTER_COUNT = 2 SCALING_POLICY = 'ECONOMY' COMMENT = 'Warehouse for DBT';
-
Create database for each layer/model. These layers can be schema OR databases based on STANDARDS which you follow. Click here for more details about the dbt structure best practices
Replace will delete old database with same name and creates a new database. So be careful when running create database command with replace option
CREATE OR REPLACE DATABASE DBT_DEMO_SOURCES; CREATE OR REPLACE DATABASE DBT_DEMO_STAGING; CREATE OR REPLACE DATABASE DBT_DEMO_MARTS;
✅ SOURCES—– —–> Database for raw incoming data from external sources
✅ STAGING———–> Database for heavy lift data processing. Aka prep area
✅ MARTS————-> Database for analytics layer to host dimensions and facts for a specific business area -
Create required schema. Here we will create one schema for each sources in SOURCE and STAGING DB and one SCHEMA for each business area for the marts.
✅ SAKILA————> Schema for raw incoming data from Sakila Sample Database
✅ SAKILA————> Schema for cleansed data from Sakila Sample Database
✅ CORE————–> Schema to host all core dimensions and factsCREATE SCHEMA "DBT_DEMO_SOURCES"."SAKILA"; CREATE SCHEMA "DBT_DEMO_STAGING"."SAKILA"; CREATE SCHEMA "DBT_DEMO_MARTS"."CORE";
-
Create Role
CREATE ROLE "DBT_ROLE";
-
Create User, Make sure to change the password to a complex one as per your needs
CREATE USER DBT_USER PASSWORD = 'dbtuser' MUST_CHANGE_PASSWORD = FALSE DEFAULT_WAREHOUSE = DBT_WH DEFAULT_ROLE = DBT_ROLE;
-
Grant role to the user
GRANT ROLE "DBT_ROLE" TO ROLE "SYSADMIN"; GRANT ROLE "DBT_ROLE" TO USER "DBT_USER";
-
Grant warehouse usage access to role
GRANT USAGE ON WAREHOUSE DBT_WH TO ROLE DBT_ROLE; GRANT CREATE SCHEMA ON DATABASE "DBT_DEMO_SOURCES" TO DBT_ROLE; GRANT CREATE SCHEMA ON DATABASE "DBT_DEMO_STAGING" TO DBT_ROLE; GRANT CREATE SCHEMA ON DATABASE "DBT_DEMO_MARTS" TO DBT_ROLE; GRANT ALL ON SCHEMA "DBT_DEMO_SOURCES"."SAKILA" TO DBT_ROLE; GRANT ALL ON SCHEMA "DBT_DEMO_STAGING"."SAKILA" TO DBT_ROLE; GRANT ALL ON SCHEMA "DBT_DEMO_MARTS"."CORE" TO DBT_ROLE; GRANT USAGE ON ALL SCHEMAS IN DATABASE "DBT_DEMO_SOURCES" TO DBT_ROLE; GRANT USAGE ON FUTURE SCHEMAS IN DATABASE "DBT_DEMO_SOURCES" TO DBT_ROLE; GRANT USAGE ON ALL SCHEMAS IN DATABASE "DBT_DEMO_STAGING" TO DBT_ROLE; GRANT USAGE ON FUTURE SCHEMAS IN DATABASE "DBT_DEMO_STAGING" TO DBT_ROLE; GRANT USAGE ON ALL SCHEMAS IN DATABASE "DBT_DEMO_MARTS" TO DBT_ROLE; GRANT USAGE ON FUTURE SCHEMAS IN DATABASE "DBT_DEMO_MARTS" TO DBT_ROLE; GRANT SELECT ON ALL TABLES IN DATABASE "DBT_DEMO_SOURCES" TO DBT_ROLE; GRANT SELECT ON FUTURE TABLES IN DATABASE "DBT_DEMO_SOURCES" TO DBT_ROLE; GRANT SELECT ON ALL TABLES IN DATABASE "DBT_DEMO_STAGING" TO DBT_ROLE; GRANT SELECT ON FUTURE TABLES IN DATABASE "DBT_DEMO_STAGING" TO DBT_ROLE; GRANT SELECT ON ALL TABLES IN DATABASE "DBT_DEMO_MARTS" TO DBT_ROLE; GRANT SELECT ON FUTURE TABLES IN DATABASE "DBT_DEMO_MARTS" TO DBT_ROLE; GRANT SELECT ON ALL VIEWS IN DATABASE "DBT_DEMO_SOURCES" TO DBT_ROLE; GRANT SELECT ON FUTURE VIEWS IN DATABASE "DBT_DEMO_SOURCES" TO DBT_ROLE; GRANT SELECT ON ALL VIEWS IN DATABASE "DBT_DEMO_STAGING" TO DBT_ROLE; GRANT SELECT ON FUTURE VIEWS IN DATABASE "DBT_DEMO_STAGING" TO DBT_ROLE; GRANT SELECT ON ALL VIEWS IN DATABASE "DBT_DEMO_MARTS" TO DBT_ROLE; GRANT SELECT ON FUTURE VIEWS IN DATABASE "DBT_DEMO_MARTS" TO DBT_ROLE; GRANT MANAGE GRANTS ON ACCOUNT TO ROLE DBT_ROLE;
-
Review the grants
SHOW GRANTS ON SCHEMA "DBT_DEMO_SOURCES"."SAKILA"; SHOW GRANTS ON SCHEMA "DBT_DEMO_STAGING"."SAKILA"; SHOW GRANTS ON SCHEMA "DBT_DEMO_MARTS"."CORE"; SHOW GRANTS TO USER DBT_USER;
Configure dbt to Snowflake connection
In this step we will try to connect dbt with Snowflake.
Configure profiles.yml
-
If you have installed dbt locally on linux machine, find the path of dbt config profiles.yml by running
dbt debug --config-dir
-
If you are running dbt in docker, then
profiles.yml
is located at.dbt/profiles.yml
-
Edit the config and add Snowflake details. See here for more details. Here is the config for this demo, be sure to update the account for your account
sakila_db: target: dev outputs: dev: type: snowflake account: <your-account-id> # User/password auth user: DBT_USER password: dbtuser role: DBT_ROLE database: DBT_DEMO_SOURCES warehouse: DBT_WH schema: DBT threads: 1 client_session_keep_alive: False
Configure dbt_project.yml
-
If you have installed dbt locally on linux machine, Create a new dbt project by running the command.
dbt init dbt
Here the
dbt
afterinit
is the name of project, So can be anything which is meaningful.To follow along this demo, delete all contents from the newly created
dbt
folder and copy the content from dbt folder of this repo. -
If you are running dbt in docker, then
dbt
directory should already have all the contents from this demo.root@ce9789ac4c21:/data/dbt# cd /data/dbt root@ce9789ac4c21:/data/dbt# ls README.md analysis data dbt_modules dbt_project.yml logs macros models snapshots target tests
-
Edit the
dbt_project.yml
to connect to the profile which we just created. The value forprofile
should exactly match with the name inprofiles.yml
name: 'sakila_db' version: '0.1' profile: 'sakila_db' source-paths: ["models"] analysis-paths: ["analysis"] test-paths: ["tests"] data-paths: ["data"] macro-paths: ["macros"] snapshot-paths: ["snapshots"] target-path: "target" clean-targets: - "target" - "dbt_modules" - "logs" models: sakila_db: materialized: table staging: database: DBT_DEMO_STAGING schema: SAKILA materialized: table marts: database: DBT_DEMO_MARTS schema: CORE materialized: view
Validate dbt to Snowflake connection
Validate the dbt profile and connection by running debug command from the dbt project directory. You need to ssh(docker exec -it dbt /bin/bash
) into container when running dbt on docker.
dbt debug
Load SOURCES
-
We will use the mysql salika db schema as source. See here for the source data model details.
-
Exported the tables as csv files and placed them in
dbt/data
folder. We will use thedbt seed
command to load the data into Snowflake. -
Before we start the
seed
lets update thedbt_project.yml
route the data to raw schema. It will by default load to the schema specified inprofiles.yml
seeds: sakila_db: database: DBT_DEMO_SOURCE schema: raw # all seeds in this project will use the mapping schema by default sources: schema: sakila # seeds in the `data/sakila/ subdirectory will use the sakila schema lookups: schema: lookups # seeds in the `data/lookups/ subdirectory will use the lookups schema
-
Load raw tables
dbt seed
-
Load went fine, though data got loaded into
DBT_RAW
rather thanRAW
schema. -
Even after specifying schema for seeds, dbt adds the default schema in
profiles.yml
as prefix. To change this behaviour we will override the code for dbt macrogenerate_schema_name
with a custom macro of same name. Create a macro namedgenerate_schema_name.sql
and copy the below code. Read more about this here{% macro generate_schema_name(custom_schema_name, node) -%} {%- set default_schema = target.schema -%} {%- if custom_schema_name is none -%} {{ default_schema }} {%- else -%} {{ custom_schema_name | trim }} {%- endif -%} {%- endmacro %}
-
Seeding the data again will load them to the right schema.
Test SOURCES
-
Its safe to test the data in sources before loading data into staging and marts. We will create a file named
src_<source>.yml
(saysrc_sakila.yml
) with the source definition, testing and documentation for source area. -
Here is an example for test specification in
src_sakila.yml
version: 2 sources: - name: sakila tables: - name: actor columns: - name: ACTOR_ID tests: - dbt_expectations.expect_column_values_to_be_unique
-
Here are couple of examples for running testing sources
# Install packages dbt deps # Test all sources dbt test --models source:* # Test specific source model dbt test --models source:lookups
Load STAGING
Our goal is to create rental fact and related dimension as shown below. Here we will load staging, aka PREP area to prepare data for dim and facts. In dbt world transformation is done using model. model is data transformation, expressed in a single SELECT statement.
Read more about dbt models here
-
Models are in
.sql
files inmodels
directory of dbt project, subdirectories are supported with inmodels
directory. -
Since most of the RAW tables are loaded by Extract and Load tools outside dbt, you need to define them as
sources
before using them. We have already defined the sources as part of testing insrc_sakila.yml
andsrc_lookups.yml
-
Here is an example model
.sql
file for a prep object. We could either useref
orsource
function since we seeded the data, but to stay close to a real use case, we will usesource
function.with source as ( -- select * from {{ ref('actor') }} select * from {{ source('raw', 'actor') }} ), renamed as ( select * from source ) select * from renamed
-
After we are ready with our models, we can execute them by running
# Run all models dbt run # Run specific models by model name dbt run --models stg_customer # Run specific models by directory name dbt run --models staging.*
Best practice is to create sub directories if we want to group certain runs together
-
Validate the data in staging objects
SELECT * FROM "DBT_DEMO_STAGING"."SAKILA"."STG_CUSTOMER"; SELECT * FROM "DBT_DEMO_STAGING"."SAKILA"."STG_FILM"; SELECT * FROM "DBT_DEMO_STAGING"."SAKILA"."STG_STAFF"; SELECT * FROM "DBT_DEMO_STAGING"."SAKILA"."STG_STORE";
-
This is looking all good.
dbt run
loads all record during each run, loading all records during each run would be a bad idea when processing huge volume of data. To solve that problem we will convert our models toincremental models
by adding below-
configuration block on top of sql file for
materialized
andunique_key
(Optional). Addingmaterialized
will also convert the view to table.{{ config( materialized='incremental', unique_key='customer_id' ) }}
-
WHERE
clause to filter the rows for incremental run{% if is_incremental() %} where scus.last_update > (select max(last_update) from {{ this }}) {% endif %}
-
-
Execute
dbt run
again and now we can see customer table getting loaded in incremental mode.
Load MARTS
The next step is to load the marts and here we will see few options to load the marts. Just for the ease of understanding we will create the customer dimension in few several ways(with different names).
Views
-
Creating the model as view is the simplest option, Here we will have to specify the
materialized
property of config as view.{{ config( materialized='view' ) }} SELECT * FROM {{ ref('stg_customer') }} scus
-
Create the model by running
dbt run -m dim_customer_view
-
Validate the definition in snowflake
select get_ddl('view', 'DBT_DEMO_MARTS.CORE.DIM_CUSTOMER_VIEW');
-
As you see this just created a simple view on top of our staging layer
create or replace view DIM_CUSTOMER_VIEW as ( SELECT * FROM DBT_DEMO_STAGING.SAKILA.stg_customer scus );
-
Validate the data in Snowflake
-- We should see 599 records SELECT COUNT(*) as no_of_rec FROM "DBT_DEMO_MARTS"."CORE"."DIM_CUSTOMER_VIEW"; SELECT * FROM "DBT_DEMO_MARTS"."CORE"."DIM_CUSTOMER_VIEW";
Incremental Table (SCD TYPE I)
-
Incremental model limits the amount of data thus reducing the runtime of transformations.
-
Specify the
materialized
property of config as incremental and these models are built as tables{{ config( materialized='incremental', incremental_strategy='delete+insert', unique_key='customer_id' ) }} SELECT * FROM {{ ref('stg_customer') }} scus {% if is_incremental() %} HAVING scus.last_update > (select max(last_update) from {{ this }}) {% endif %}
-
Create the model by running (-d is to run in debug mode to see the full execution logs in CLI)
dbt -d run -m dim_customer_incremental
-
You should see following steps in the dbt run log
🚧 Create TEMPORARY table with HAVING clause to filter records based incremental condition
🚧 Delete from the TARGET table, since we haveincremental_strategy
asdelete+insert
🚧 Insert into the TARGET table by selecting from the TEMPORARY table -
Validate the definition in snowflake, output will be a create table statement
select get_ddl('table', 'DBT_DEMO_MARTS.CORE.DIM_CUSTOMER_INCREMENTAL');
-
Validate the data in Snowflake
-- We should see 599 records SELECT COUNT(*) as no_of_rec FROM "DBT_DEMO_MARTS"."CORE"."DIM_CUSTOMER_INCREMENTAL"; SELECT * FROM "DBT_DEMO_MARTS"."CORE"."DIM_CUSTOMER_INCREMENTAL"; -- Check the current max record in staging area and marts select 'stg_customer' as table_name, max(last_update) as last_update from DBT_DEMO_STAGING.SAKILA.stg_customer union select 'dim_customer_view' as table_name, max(last_update) as last_update from DBT_DEMO_MARTS.CORE.dim_customer_view union select 'dim_customer_incremental' as table_name, max(last_update) as last_update from DBT_DEMO_MARTS.CORE.dim_customer_incremental -- Check the record before update SELECT * FROM DBT_DEMO_STAGING.SAKILA.stg_customer where CUSTOMER_ID = 1; SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_view where CUSTOMER_ID = 1; SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_incremental where CUSTOMER_ID = 1;
-
Make some change in stage
-- Update the staging table UPDATE DBT_DEMO_STAGING.SAKILA.stg_customer SET CUSTOMER_DISTRICT = 'Kansas', LAST_UPDATE = current_timestamp(2) WHERE CUSTOMER_ID = 1 -- Check the record after update, the view should have the updated information SELECT * FROM DBT_DEMO_STAGING.SAKILA.stg_customer where CUSTOMER_ID = 1; SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_view where CUSTOMER_ID = 1; SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_incremental where CUSTOMER_ID = 1;
-
Re-run the model
dbt -d run -m dim_customer_incremental
-
Validate the data
-- Check the record after update, the stage, view and incremental table should have the updated information now SELECT * FROM DBT_DEMO_STAGING.SAKILA.stg_customer where CUSTOMER_ID = 1; SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_view where CUSTOMER_ID = 1; SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_incremental where CUSTOMER_ID = 1;
-
So this is an type-1(overwrite) Slowly Changing Dimension implementation
Snapshot Table (SCD TYPE II)
-
Snapshots are used to record changes to a mutable table over time. Snapshots are type-2(add new row) Slowly Changing Dimension
-
Code for snapshots should be placed inside
snapshots
directory in the dbt project -
snapshots needs
target_database
andtarget_schema
which can be specified in the configuration block of sql OR insidedbt_project.yml
. In this example, updateddbt_project.yml
with below block. -
snapshots: sakila_db: marts: target_database: DBT_DEMO_MARTS target_schema: CORE
-
Snapshot sql(say
snap_customer.sql
) files are select statement inside snapshot block, config block for snapshots requires the following
⏩unique_key
⏩strategy
can betimestamp
ORcheck
.updated_at
is required fortimestamp
andcheck_cols
is required forcheck
{% snapshot snap_customer %} {{ config( unique_key='customer_id', strategy='timestamp', updated_at='last_update', invalidate_hard_deletes=True, ) }} select * from {{ ref('dim_customer_view') }} {% endsnapshot %}
-
Create the snapshot by running
dbt snapshot -s snap_customer
-
Validate the definition in snowflake, output will be a create TRANSIENT table statement. In addition to the data attributes, the table definition will also have following dbt audit attributes
Attribute Name Description DBT_SCD_ID A unique key generated for each snapshotted record DBT_UPDATED_AT The updated_at timestamp of the source record when this snapshot row was inserted DBT_VALID_FROM The timestamp when this snapshot row was first inserted DBT_VALID_TO The timestamp when this row row became invalidated select get_ddl('table', 'DBT_DEMO_MARTS.CORE.SNAP_CUSTOMER');
-
Validate the data in Snowflake
-- We should see 599 records SELECT COUNT(*) as no_of_rec FROM "DBT_DEMO_MARTS"."CORE"."SNAP_CUSTOMER"; SELECT * FROM "DBT_DEMO_MARTS"."CORE"."SNAP_CUSTOMER"; -- Check the current max record in staging area and marts select 'stg_customer' as table_name, max(last_update) as last_update from DBT_DEMO_STAGING.SAKILA.stg_customer union select 'dim_customer_view' as table_name, max(last_update) as last_update from DBT_DEMO_MARTS.CORE.dim_customer_view union select 'snap_customer' as table_name, max(last_update) as last_update from DBT_DEMO_MARTS.CORE.snap_customer -- Check the record before update SELECT * FROM DBT_DEMO_STAGING.SAKILA.stg_customer where CUSTOMER_ID = 1; SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_view where CUSTOMER_ID = 1; SELECT * FROM DBT_DEMO_MARTS.CORE.snap_customer where CUSTOMER_ID = 1;
-
Make some change in stage
-- Update the staging table UPDATE DBT_DEMO_STAGING.SAKILA.stg_customer SET CUSTOMER_DISTRICT = 'Kansas', LAST_UPDATE = current_timestamp(2) WHERE CUSTOMER_ID = 1 -- Check the record after update, the view should have the updated information SELECT * FROM DBT_DEMO_STAGING.SAKILA.stg_customer where CUSTOMER_ID = 1; SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_view where CUSTOMER_ID = 1; SELECT * FROM DBT_DEMO_MARTS.CORE.snap_customer where CUSTOMER_ID = 1;
-
Re-run the snapshot
dbt snapshot -s snap_customer
-
Validate the data, we should see two records for the record which we updated
-- Stage, view and snapshot table should have the updated information now SELECT * FROM DBT_DEMO_STAGING.SAKILA.stg_customer where CUSTOMER_ID = 1; SELECT * FROM DBT_DEMO_MARTS.CORE.dim_customer_view where CUSTOMER_ID = 1; SELECT * FROM DBT_DEMO_MARTS.CORE.snap_customer where CUSTOMER_ID = 1;
Custom Materializations (Support for DDL)
Materializations are responsible for taking a dbt model sql statement and turning it into a transformed dataset in a database. dbt does not support data description language (DDL), but can be made possible with custom materializations. Lets see one such custom materializations, Persistent Tables materialization. This materialization supports DDL, non destructive column add (CREATE OR REPLACE will drop the existing table, so Persistent Tables materialization works by creating a copy table and then copying the data back over after the changes)
-
Custom macros’s should be placed inside macros directory, its good to place them inside a sub folder for specific function
-
Create sql file with DDL
{{ config(materialized='persistent_table' ,retain_previous_version_flg=false ,migrate_data_over_flg=true ) }} CREATE OR REPLACE TABLE "{{ database }}"."{{ schema }}"."dim_customer_materialization" ( CUSTOMER_ID NUMBER(38,0), CUSTOMER_FIRST_NAME VARCHAR(100), CUSTOMER_LAST_NAME VARCHAR(100), CUSTOMER_EMAIL VARCHAR(100) )
-
Create model by running
dbt -d run -m dim_customer_materialization --full-refresh
-
Validate the definition in snowflake, The definition should match the DDL statements in the model file
select get_ddl('table', 'DBT_DEMO_MARTS.CORE.SNAP_CUSTOMER');
Custom Materializations (Support for backfills and DDL’s)
If we want to add a new column OR drop an column from an existing table, its not possible with incremental materialization unless we do a full refresh. Not all tables can afford a full refresh for several reasons
⏩ Cost associated with full refresh based backfill
⏩ Source tables no longer has all the historical data required to do a full refresh based backfill
To solve this problem we will use vault_insert_by_period materialisation. I have customized a bit further to add support for target database parameter and few other cosmetic logic changes. Click here for the links to original code and the customized code
TO DO : Rename the materialisation since its a customized version
-
Since all of the source records which we have used in this demo has the same last updated date, to have some data for backfills scenario we will run below sql
UPDATE DBT_DEMO_STAGING.SAKILA.stg_customer SET CUSTOMER_DISTRICT = 'Kansas', LAST_UPDATE = current_timestamp(2) WHERE CUSTOMER_ID = 1;
-
Create a new model sql file, say dim_customer_insert_by_period.sql with below code, This materialisation requires
period granularity of the load (hour, day, month, year) timestamp_field column name for time stamp field which should be used for filtering data start_date dates to control the load range stop_date dates to control the load range {{ config( materialized = "vault_insert_by_period", period = "year", timestamp_field = "last_update", start_date = "2006-01-01", stop_date = "2021-01-01", ) }} WITH stage AS ( SELECT CUSTOMER_ID, CUSTOMER_FIRST_NAME, CUSTOMER_LAST_NAME, CUSTOMER_EMAIL, LAST_UPDATE FROM {{ ref('stg_customer') }} WHERE __PERIOD_FILTER__ ) SELECT * FROM stage
-
Run the model
dbt -d run -m dim_customer_insert_by_period
-
Now, lets add a new column to the model by running below sql. Idea here is to manage the DDL’s outside dbt in a DCM(Database Change Management) tool like sqitch
ALTER TABLE "DBT_DEMO_MARTS"."CORE"."DIM_CUSTOMER_INSERT_BY_PERIOD" ADD COLUMN CUSTOMER_DISTRICT VARCHAR DEFAULT NULL;
-
Validate the data in snowflake
-
Update the start_date and stop_date in dim_customer_insert_by_period.sql
start_date = "2021-01-01", stop_date = "2021-04-01",
-
Run the model
dbt -d run -m dim_customer_insert_by_period
- Validate the data in snowflake, As you could see this run just loaded the one record which for the period mentioned in start and stop date.
Next step is to integrate the start and end date as a parameter from airflow and to add delete logic always before the load to have better handle on loads for a specific period.
Packages in dbt
A good process of software reuse leads to enhance the reliability, productivity, quality and the reduction of time and cost. In dbt, reusability comes in the form of packages. You can find all dbt packages in dbt hub
-
Create
packages.yml
at the same level asdbt_project.yml
and update it with the required package details. The one I am more interested about during this exploration isdbt_expectations
, this allows dbt users to deploy Great Expectations like data testing without needing additional integration with Great Expectations.packages: - package: fishtown-analytics/dbt_utils version: 0.6.4 - package: calogica/dbt_expectations version: 0.2.2 - package: fishtown-analytics/codegen version: 0.3.1 - package: yu-iskw/dbt_airflow_macros version: 0.2.2 - package: Datavault-UK/dbtvault version: 0.7.3
-
To install packages, run
dbt deps
Tests in dbt
dbt has two type of tests, schema tests and data tests
-
Schema tests are configured as YAML inside the
schema.yml
file. Schema test returns 0 when successful. -
Data test is a select statement inside
tests
directory. Data test returns 0 records when successful. -
Here is a sample configuration file say
stg_sakila.yml
for doing schema test using the default dbt testsunique
,not_null
and dbt_expectations testexpect_column_values_to_be_unique
models: - name: customer columns: - name: customer_id tests: - unique - not_null - dbt_expectations.expect_column_values_to_be_unique
-
To execute tests, run
dbt test
Currently the test runs after model creation and persisting the changes, there is a ticket open to do test after model creation but persist changes only if test passes. See more details about this here
Documents in dbt
-
dbt provides way to generate documentation and to publish them as a website.
-
You can add descriptions to models, columns, sources in the
schema.yml
. Here is an examplemodels: - name: customer description: This table contains information about customers who rented the movie columns: - name: customer_id description: This is a unique identifier for customer
-
dbt also supports
docs block
using the jinjadocs
tag, to add docs block
⏩ Updatedbt_project.yml
to add the followingdocs-paths: ["docs"] asset-paths: ["assets"]
⏩ Create
.md
file with required information insidedocs
directory and place the supporting images inassets
directory. Example would beoverview.md
file with__overview__
block which overrides the default overview and creates custom overview page{% docs __overview__ %} # Exploring Dbt With Snowflake In this article we will see how to use dbt with Snowflake. ![Overview](../assets/explore-dbt.png) {% enddocs %}
⏩ Another example would be to
film.md
and update it with details about films table{% docs film_overview %} This table contains information about films. {% enddocs %}
⏩ Update
schema.yml
to add the doc() function to reference the docs name, which is film_overview in this example- name: film description: '{{ doc("film_overview") }}'
-
You can generate documents by running
dbt docs generate
. -
Generated documents can be published by running
dbt docs serve --port 8001
. This will publish the documents locally.
Its advisable to run
dbt run
before generating documents. So the order of execution would bedbt deps dbt seed dbt run dbt test dbt docs generate dbt docs serve
-
Its not mandatory to add entry for each column in
schema.yml
, but the documentation would be good as details in yourschema.yml
. So its recommended to add detailed information and to follow some standards forschema.yml
. -
Documents can be also hosted on s3 as a static site. Click here for documents generated from this demo. Here are the high level steps to host dbt docs in s3
⏩ Create s3 bucket
⏩ Update s3 bucket policy to allow read access{ "Version": "2012-10-17", "Statement": [ { "Sid": "PublicReadGetObject", "Effect": "Allow", "Principal": "*", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::dbt.entechlog.com/*" } ] }
⏩ Enable Static website hosting
⏩ Upload the contents fromtarget/
directory of dbt project to the s3 bucket.
Hope this was helpful. Did I miss something ? Let me know in the comments and I’ll add it in !
Notes
*** You can see list all containers by running
docker container ls -a
*** You can bring down the containers by runningdocker-compose down
*** You can bring down the containers and related volumes by runningdocker-compose down --volumes
*** You can delete all exited containers by runningdocker rm $(docker ps -q -f status=exited)
*** You can delete all generated folders and clean the dbt files by runningdbt clean
References
- https://unicode-table.com/en/#basic-latin
- https://towardsdatascience.com/setting-up-a-ci-using-dbt-cloud-and-snowflake-f79eecbb98ae
- https://github.com/fishtown-analytics/jaffle_shop
- https://relational.fit.cvut.cz/dataset/Employee
- https://www.jooq.org/sakila
- https://github.com/joelsotelods/sakila-db-queries
- https://docplayer.net/31815308-A-data-warehousing-and-business-intelligence-tutorial-starring-sakila.html