Orchestrating dbt and Pyspark

ZD
3 min readJul 10, 2022

I use dbt for my data projects to implement the medallion data pipeline architecture that processes data in three: bronze, silver, and gold stages.

Data pipeline medallion architecture

There is a dbt model for transforming data in each stage. Bronze and silver stages store data in Apache Iceberg, so their dbt models are executed in Apache Spark. The gold stage is usually implemented in Postgres, so it requires moving data from the Iceberg. Trino is excellent for transferring data between silver and gold stages.

In many cases, I also need a non-SQL data transformation like a machine learning model (often a Pyspark code). The non-SQL transformation usually yields a silver stage table later consumed by a silver or gold dbt model.

It will be excellent if dbt can invoke the Ptspark code from a model. Please let me know if you know how to invoke a code from a dbt model or at least from dbt pre or post-hooks.

So bronze, silver, and gold dbt projects must be executed one after another. Moreover, some of the dbt models in these projects depend on the result of Pyspark code execution.

I searched for a solution to this problem and quickly figured out that orchestration tools like AirFlow, Prefect, or Dagster can help. Dagster is excellent because I like how it can be quickly extended with Python code (custom orchestration operations, resources, and jobs).

Orchestration example

This demo GitHub repository with the open-source data stack contains an example of a specific orchestration scenario. The demo transforms data in these steps:

  1. Download selected stock symbols data from Yahoo finance and store it as a CSV file in a minio bucket.
  2. Load the CSV file to a bronze stage table using a dbt model that executes in Apache Spark.
  3. Transform the bronze table data in the silver stage. This transformation includes a pyspark code that predicts stock prices with the Arima time-series machine learning model.
  4. Transfer and transform the silver stage data into an analytics model in the gold stage. Again there is a set of dbt models executed in Trino that maps both Iceberg bronze and silver data stages and the Postgres gold stage into three schemas of the same database and move data between them with the SQL.
dbt stages and pyspark in an example data pipeline

You can find all dbt stages code in this folder.

Dagster executes all three stages and the pyspark stock price prediction code in a proper sequence.

Dagster orchestration of a simple stock market data pipeline

You can review the Dagster code here. I also recommend this quick Dagster tutorial that explains the basic Dagster concepts.

That’s it. Please let me know what you think. Also, please give me a shout if you know how to invoke a code from dbt models or pre/post hooks.

Thanks!

--

--