On the 5th of April 2022, Databricks announced the general availability of Delta Live Tables. It allows you to define streaming or batch processing pipelines easily, including scheduling and data quality checks, all using a simple syntax in a notebook. In this article, we are going to see how we can deliver the best code quality when using Delta Live Tables.
The problem with notebooks
As you may know from experience or readings, notebooks do not encourage software engineering best practices like structuring the code in functions or classes, automated testing, etc. In fact, this analysis of more than 1 M notebooks from Github showed that less than 2% of the notebooks are tested, and more than a third have out-of-order cells! While running a small data pipeline on Delta Live Tables, you may not encounter any issues, and it is very fast to start with. But if you plan to do more complex transformations, it is unreasonable to keep everything in notebooks. Delta Live Tables allows for defining several notebooks for a single pipeline so that you can at least split the code into distinct parts. But that leaves you with a collection of notebooks instead of one—so not a huge improvement.
Reconciling Delta Live Tables and Software Engineering best practices
At Sicara, we used Delta Live Tables for one of our projects. And here is what we used to check that we deliver the best quality:
- git versioning
- black
- isort
- pylint
- mypy
- pytest
- pre-commit for running all the above 5 tools at every commit
- circleci for our CI/CD pipeline
- make for defining custom targets for running the checks, packaging, deploying, ...
Not only is it possible to use these tools with Delta Live Tables, we believe that it is necessary to deliver good quality at a large scale. To reach this goal, the core idea is to externalize as much of the logic of your pipeline as you can in a git repository. Then you can package and install it on Delta Live Tables at run time. For everything that is in your git repository, you can then use all the software engineering tools you like. And most notably, a decent IDE and automated tests.
I am going to share with you how we did it, with an example pyspark workflow, in the same spirit discover how to launch experiment on databricks. The example I will use is available on Github in this repository. Feel free to go and look at it if you like, and come back here for some explanations! This repository uses the NYC Taxi data available on Databricks to demonstrate the ideas described in this article. At the time of this writing, every feature listed above is implemented in it, except the CI/CD pipeline. You can use this repository as a template for starting your own project!
How to Setup Delta Live Tables, our way
1- Initialize a git repository
- Create a GitHub repository
- Install poetry
- Initialize a library with
poetry new
I let the details of the above points as an exercise for the reader 😉
2- Create a Delta Live Tables pipeline in your repository
To illustrate this, we are going to go through a toy pipeline. It will create a first table with NYC Taxi data and a second table with the same data enriched with a boolean column indicating whether the trip is less than 1 mile. We are going to see how to write the transform, how to test it locally, how to add data quality checks, and how to deploy the code on Databricks!
2.1- Create some pyspark transforms
For example, here is the example transform I added in the Github repository:
import pyspark.sql.functions as F
def enrich_nyc_taxi_data(nyc_taxi_df):
return nyc_taxi_df.withColumn(
"short_ride", F.when(F.col("trip_distance") < 1, True).otherwise(False)
)
2.2- Test your transforms
Here is the test associated with the above transformation:
# pylint: disable=redefined-outer-name
import pytest
from pyspark.sql import SparkSession
from dlt_pipeline.transformations import enrich_nyc_taxi_data
@pytest.fixture()
def spark():
spark_session = (
SparkSession.builder.master("local[*]")
.appName("dlt-pipeline-tests")
.getOrCreate()
)
yield spark_session
spark_session.stop()
def format_test_dataframe(dataframe):
return dataframe.toJSON().collect()
def test_enrich_nyc_taxi_data(spark):
nyc_taxi_data = [("trip_1", 0.9), ("trip_2", 1.0), ("trip_3", 1.1)]
nyc_taxi_columns = ["trip_id", "trip_distance"]
input_nyc_taxi_df = spark.createDataFrame(nyc_taxi_data).toDF(*nyc_taxi_columns)
expected_enriched_nyc_taxi_data = [
("trip_1", 0.9, True),
("trip_2", 1.0, False),
("trip_3", 1.1, False),
]
expected_enriched_nyc_taxi_columns = ["trip_id", "trip_distance", "short_ride"]
expected_enriched_nyc_taxi_df = spark.createDataFrame(
expected_enriched_nyc_taxi_data
).toDF(*expected_enriched_nyc_taxi_columns)
enriched_nyc_taxi_df = enrich_nyc_taxi_data(input_nyc_taxi_df)
assert format_test_dataframe(enriched_nyc_taxi_df) == format_test_dataframe(
expected_enriched_nyc_taxi_df
)
2.3- Integrate the transforms in a Delta Live Tables pipeline
The next step is to create the code defining the steps of the Delta Live Tables pipeline. As we cannot install the dlt
python package in local, we want to isolate the code defining the flow of our transformations and the data quality checks, and extract any transformation logic out of it. Here is what it looks like, data quality checks being imported from another file:
import dlt # pylint: disable=import-error
from dlt_pipeline.data_quality_checks import (
enriched_taxi_data_valid,
enriched_taxi_data_warn,
raw_taxi_data_valid,
raw_taxi_data_warn,
)
from dlt_pipeline.transformations import enrich_nyc_taxi_data
def define_pipeline(spark):
@dlt.table(comment="NYC Taxi data")
@dlt.expect_all_or_drop(raw_taxi_data_valid)
@dlt.expect_all(raw_taxi_data_warn)
def raw_nyc_taxi_data():
nyc_taxi_df = spark.read.format("json").load(
"/databricks-datasets/nyctaxi/sample/json/"
)
return nyc_taxi_df
@dlt.table(comment="Enriched NYC Taxi data")
@dlt.expect_all_or_drop(enriched_taxi_data_valid)
@dlt.expect_all(enriched_taxi_data_warn)
def enriched_nyc_taxi_data():
nyc_taxi_df = dlt.read("raw_nyc_taxi_data")
return enrich_nyc_taxi_data(nyc_taxi_df)
You can see that we define all the steps of our pipeline inside the define_pipeline
function. This function takes the spark session as an argument, but you can add other arguments if needed. This can be useful if you have one pipeline per environment (staging and production for instance), that needs to act on different external resources.
Data quality checks are defined in a dedicated file. They are declared as python dict
, with values using SQL expressions like those you can put in a WHERE
clause. You can then use these expectations to implement your own strategy for managing invalid records. You can drop them, keep them, or quarantine them. If you would like some guidance for devising a good data pipeline strategy.
raw_taxi_data_valid = {
"total_amount_exists": "total_amount is not null",
"trip_distance_is_positive": "trip_distance > 0",
}
raw_taxi_data_warn = {
"passenger_count_less_than_five": "passenger_count < 5",
}
enriched_taxi_data_valid = {
**raw_taxi_data_valid,
"short_ride_exists": "short_ride is not null",
}
enriched_taxi_data_warn = raw_taxi_data_warn
Again, you can check the GitHub repository to get familiar with its structure.
2.4- Deploy your package on Databricks
You can then use poetry build to create a wheel of your package and upload that package on Databricks. This is done with a make target in the repository. After that, we are ready to define and run the pipeline on Databricks!
3- Create the Delta Live Tables notebook and pipeline
3.1- Create the notebook once and for all
Create a notebook looking like the screenshot below.
3.2- Create the pipeline
You can then define a Delta Live Tables pipeline using that notebook. For running the example given in the Github repository you can keep the default settings.
3.3- Test it!
After defining the pipeline, you can run it and see a successful run like the below screenshot.
What is interesting and a strong point for Delta Live Tables is that you can see the data quality checks associated with each of the tables defined in the pipeline. You can see at a glance how much data does not meet your expectations, and the repartition of the failures on the rules you defined.
Conclusion
We have seen with this example how to produce good quality code with Delta Live Tables. I encourage you to test this new feature from Databricks, and I wish you great success at scaling your most complex data pipelines!
As I am writing this article, let’s also share my thoughts about building complex data pipelines with Delta Live Tables. I think that it is great for getting started really fast, and for the very nice integration of data quality checks in the pipeline. However, I fell that Databricks should improve Delta Live Tables and remove some of its most restrictive limitations if they want to see massive adoption:
- Everything is black-box in Delta Live Tables. You cannot see the source code of the dlt python package, nor can you see what’s going under the hood when your pipeline is stuck for 10 minutes in SETTING_UP_TABLES step, and then 3 minutes rendering dataflow graph...
- The ways you can change default configuration are limited: you cannot use custom JAR files, nor init scripts, and changes in the spark configuration seem to not be taken into account.
If you are happy with Delta Live Tables overall performance and don’t need to dive into deeper details, the overall experience is great! Otherwise, at the time of this writing, I advise you to choose another option. If you are unsure, it will not take you too much time to at least try it! I hope Databricks will gradually open the black box and improve the documentation of Delta Live Tables. If they do, I believe it will have a great future!
Do you have a project that involves setting up a data pipeline, and data quality challenges? Feel free to contact us!
Cover image from Wikimedia user Agrawroh, unaltered. Creative Commons