How to Use Great Expectations in Databricks
Great Expectations works well with many types of Databricks workflows. This guide will help you run Great Expectations in Databricks.
Prerequisites: This how-to guide assumes you have:
- Completed the Getting Started Tutorial
- Have completed Databricks setup including having a running Databricks cluster with attached notebook
- If you are using the file based version of this guide, you'll need to have DBFS set up
There are several ways to set up Databricks, this guide centers around an AWS deployment using Databricks Data Science & Engineering Notebooks and Jobs. If you use Databricks on GCP or Azure and there are steps in this guide that don't work for you please reach out to us.
We will cover a simple configuration to get you up and running quickly, and link to our other guides for more customized configurations. For example:
- If you want to validate files stored in DBFS select one of the "File" tabs below. You can also watch our video walkthrough of these steps.
- If you are using a different file store (e.g. s3, GCS, ABS) take a look at our how-to guides in the "Cloud" section of "Connecting to Your Data" for example configurations.
- If you already have a spark dataframe loaded, select one of the "Dataframe" tabs below.
This guide parallels notebook workflows from the Great Expectations CLI, so you can optionally prototype your setup with a local sample batch before moving to Databricks. You can also use examples and code from the notebooks that the CLI generates, and indeed much of the examples that follow parallel those notebooks closely.
1. Install Great Expectations
Install Great Expectations as a notebook-scoped library by running the following command in your notebook:
%pip install great-expectations
What is a notebook-scoped library?
After that we will take care of some imports that will be used later. Choose your configuration options to show applicable imports:
- File-yaml
- File-python
- Dataframe-yaml
- Dataframe-python
from ruamel import yaml
from great_expectations.core.batch import BatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
FilesystemStoreBackendDefaults,
)
from ruamel import yaml
from great_expectations.core.batch import BatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
FilesystemStoreBackendDefaults,
)
import datetime
import pandas as pd
from ruamel import yaml
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
FilesystemStoreBackendDefaults,
)
import datetime
import pandas as pd
from ruamel import yaml
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
FilesystemStoreBackendDefaults,
)
2. Set up Great Expectations
In this guide, we will be using the Databricks File Store (DBFS) for your Metadata Stores and Data Docs store. This is a simple way to get up and running within the Databricks environment without configuring external resources. For other options for storing data see our "Metadata Stores" and "Data Docs" sections in the "How to Guides" for "Setting up Great Expectations."
What is DBFS?
Run the following code to set up a Data Context in code using the appropriate defaults:
What is an "in code" Data Context?
What do we mean by "root_directory" in the below code?
- File-yaml
- File-python
- Dataframe-yaml
- Dataframe-python
root_directory = "/dbfs/great_expectations/"
data_context_config = DataContextConfig(
store_backend_defaults=FilesystemStoreBackendDefaults(
root_directory=root_directory
),
)
context = BaseDataContext(project_config=data_context_config)
root_directory = "/dbfs/great_expectations/"
data_context_config = DataContextConfig(
store_backend_defaults=FilesystemStoreBackendDefaults(
root_directory=root_directory
),
)
context = BaseDataContext(project_config=data_context_config)
root_directory = "/dbfs/great_expectations/"
data_context_config = DataContextConfig(
store_backend_defaults=FilesystemStoreBackendDefaults(
root_directory=root_directory
),
)
context = BaseDataContext(project_config=data_context_config)
root_directory = "/dbfs/great_expectations/"
data_context_config = DataContextConfig(
store_backend_defaults=FilesystemStoreBackendDefaults(
root_directory=root_directory
),
)
context = BaseDataContext(project_config=data_context_config)
3. Prepare your data
- File-yaml
- File-python
- Dataframe-yaml
- Dataframe-python
We will use our familiar NYC taxi yellow cab data, which is available as sample data in Databricks. Let's copy some example csv data to our DBFS folder for easier access using dbutils:
# Copy 3 months of data
for month in range(1, 4):
dbutils.fs.cp(
f"/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-0{month}.csv.gz",
f"/example_data/nyctaxi/tripdata/yellow/yellow_tripdata_2019-0{month}.csv.gz"
)
We will use our familiar NYC taxi yellow cab data, which is available as sample data in Databricks. Let's copy some example csv data to our DBFS folder for easier access using dbutils:
# Copy 3 months of data
for month in range(1, 4):
dbutils.fs.cp(
f"/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-0{month}.csv.gz",
f"/example_data/nyctaxi/tripdata/yellow/yellow_tripdata_2019-0{month}.csv.gz"
)
We will use our familiar NYC taxi yellow cab data, which is available as sample data in Databricks. Run the following code in your notebook to load a month of data as a dataframe:
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-01.csv.gz")
We will use our familiar NYC taxi yellow cab data, which is available as sample data in Databricks. Run the following code in your notebook to load a month of data as a dataframe:
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-01.csv.gz")
4. Connect to your data
- File-yaml
- File-python
- Dataframe-yaml
- Dataframe-python
Here we add a Datasource and Data Connector by running the following code. In this example, we are using a InferredAssetDBFSDataConnector
so that we can access and validate each of our files as a Data Asset
, but instead you may use any of the other types of Data Connectors
, Partitioners
, Splitters
, Samplers
, Queries
available to you (check out our documentation on "Connecting to your data" for more information).
What does this configuration contain?
- class_name: Here we reference one of the two DBFS data connectors InferredAssetDBFSDataConnector (ConfiguredAssetDBFSDataConnector is also available) which handle the translation from /dbfs/ to dbfs:/ style paths for you. For more information on the difference between Configured/Inferred, see How to choose which DataConnector to use.
- base_directory: Where your files are located, here we reference the file path in DBFS we copied our data to earlier.
- glob_directive: This allows you to select files within that base_directory that match a glob pattern.
- default_regex: Here we specify the group_names corresponding to the groups in the regex defined in the pattern - we can use these later to filter so that we can apply our Checkpoint to a specific Batch (using this configuration, each file is a Batch).
Datasource configuration:
my_spark_datasource_config = r"""
name: insert_your_datasource_name_here
class_name: Datasource
module_name: great_expectations.datasource
execution_engine:
module_name: great_expectations.execution_engine
class_name: SparkDFExecutionEngine
data_connectors:
insert_your_data_connector_name_here:
module_name: great_expectations.datasource.data_connector
class_name: InferredAssetDBFSDataConnector
base_directory: /dbfs/example_data/nyctaxi/tripdata/yellow/
glob_directive: "*.csv.gz"
default_regex:
group_names:
- data_asset_name
- year
- month
pattern: (.*)_(\d{4})-(\d{2})\.csv\.gz
"""
Check the Datasource:
context.test_yaml_config(my_spark_datasource_config)
Add the Datasource:
context.add_datasource(**yaml.load(my_spark_datasource_config))
Then we create a BatchRequest
using the DataAsset
we configured earlier to use as a sample of data when creating Expectations:
batch_request = BatchRequest(
datasource_name="insert_your_datasource_name_here",
data_connector_name="insert_your_data_connector_name_here",
data_asset_name="yellow_tripdata",
batch_spec_passthrough={
"reader_method": "csv",
"reader_options": {
"header": True,
},
},
)
Here we add a Datasource and Data Connector by running the following code. In this example, we are using a InferredAssetDBFSDataConnector
so that we can access and validate each of our files as a Data Asset
, but instead you may use any of the other types of Data Connectors
, Partitioners
, Splitters
, Samplers
, Queries
available to you (check out our documentation on "Connecting to your data" for more information).
What does this configuration contain?
- class_name: Here we reference one of the two DBFS data connectors InferredAssetDBFSDataConnector (ConfiguredAssetDBFSDataConnector is also available) which handle the translation from /dbfs/ to dbfs:/ style paths for you. For more information on the difference between Configured/Inferred, see How to choose which DataConnector to use.
- base_directory: Where your files are located, here we reference the file path in DBFS we copied our data to earlier.
- glob_directive: This allows you to select files within that base_directory that match a glob pattern.
- default_regex: Here we specify the group_names corresponding to the groups in the regex defined in the pattern - we can use these later to filter so that we can apply our Checkpoint to a specific Batch (using this configuration, each file is a Batch).
Datasource configuration:
my_spark_datasource_config = {
"name": "insert_your_datasource_name_here",
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SparkDFExecutionEngine",
},
"data_connectors": {
"insert_your_data_connector_name_here": {
"class_name": "InferredAssetDBFSDataConnector",
"base_directory": "/dbfs/example_data/nyctaxi/tripdata/yellow/",
"glob_directive": "*.csv.gz",
"default_regex": {
"group_names": [
"data_asset_name",
"year",
"month",
],
"pattern": r"(.*)_(\d{4})-(\d{2})\.csv\.gz",
},
},
},
}
Check the Datasource:
context.test_yaml_config(yaml.dump(my_spark_datasource_config))
Add the Datasource:
context.add_datasource(**my_spark_datasource_config)
Then we create a BatchRequest
using the DataAsset
we configured earlier to use as a sample of data when creating Expectations:
batch_request = BatchRequest(
datasource_name="insert_your_datasource_name_here",
data_connector_name="insert_your_data_connector_name_here",
data_asset_name="yellow_tripdata",
batch_spec_passthrough={
"reader_method": "csv",
"reader_options": {
"header": True,
},
},
)
Here we add a Datasource and Data Connector by running the following code. In this example, we are using a RuntimeDataConnector
so that we can access and validate our loaded dataframe, but instead you may use any of the other types of Data Connectors
, Partitioners
, Splitters
, Samplers
, Queries
available to you (check out our documentation on "Connecting to your data" for more information).
Datasource configuration:
my_spark_datasource_config = """
name: insert_your_datasource_name_here
class_name: Datasource
execution_engine:
class_name: SparkDFExecutionEngine
data_connectors:
insert_your_data_connector_name_here:
module_name: great_expectations.datasource.data_connector
class_name: RuntimeDataConnector
batch_identifiers:
- some_key_maybe_pipeline_stage
- some_other_key_maybe_run_id
"""
Check the Datasource:
context.test_yaml_config(my_spark_datasource_config)
Add the Datasource:
context.add_datasource(**yaml.load(my_spark_datasource_config))
Then we create a BatchRequest
using the DataAsset
we configured earlier to use as a sample of data when creating Expectations:
batch_request = RuntimeBatchRequest(
datasource_name="insert_your_datasource_name_here",
data_connector_name="insert_your_data_connector_name_here",
data_asset_name="<YOUR_MEANGINGFUL_NAME>", # This can be anything that identifies this data_asset for you
batch_identifiers={
"some_key_maybe_pipeline_stage": "prod",
"some_other_key_maybe_run_id": f"my_run_name_{datetime.date.today().strftime('%Y%m%d')}",
},
runtime_parameters={"batch_data": df}, # Your dataframe goes here
)
Here we add a Datasource and Data Connector by running the following code. In this example, we are using a RuntimeDataConnector
so that we can access and validate our loaded dataframe, but instead you may use any of the other types of Data Connectors
, Partitioners
, Splitters
, Samplers
, Queries
available to you (check out our documentation on "Connecting to your data" for more information).
Datasource configuration:
my_spark_datasource_config = {
"name": "insert_your_datasource_name_here",
"class_name": "Datasource",
"execution_engine": {"class_name": "SparkDFExecutionEngine"},
"data_connectors": {
"insert_your_data_connector_name_here": {
"module_name": "great_expectations.datasource.data_connector",
"class_name": "RuntimeDataConnector",
"batch_identifiers": [
"some_key_maybe_pipeline_stage",
"some_other_key_maybe_run_id",
],
}
},
}
Check the Datasource:
context.test_yaml_config(yaml.dump(my_spark_datasource_config))
Add the Datasource:
context.add_datasource(**my_spark_datasource_config)
Then we create a BatchRequest
using the DataAsset
we configured earlier to use as a sample of data when creating Expectations:
batch_request = RuntimeBatchRequest(
datasource_name="insert_your_datasource_name_here",
data_connector_name="insert_your_data_connector_name_here",
data_asset_name="<YOUR_MEANGINGFUL_NAME>", # This can be anything that identifies this data_asset for you
batch_identifiers={
"some_key_maybe_pipeline_stage": "prod",
"some_other_key_maybe_run_id": f"my_run_name_{datetime.date.today().strftime('%Y%m%d')}",
},
runtime_parameters={"batch_data": df}, # Your dataframe goes here
)
🚀🚀 Congratulations! 🚀🚀 You successfully connected Great Expectations with your data.
Now let's keep going to create an Expectation Suite and validate our data.5. Create Expectations
Here we will use a Validator
to interact with our batch of data and generate an Expectation Suite
.
Each time we evaluate an Expectation (e.g. via validator.expect_*
), the Expectation configuration is stored in the Validator. When you have run all of the Expectations you want for this dataset, you can call validator.save_expectation_suite()
to save all of your Expectation configurations into an Expectation Suite for later use in a checkpoint.
This is the same method of interactive Expectation Suite editing used in the CLI interactive mode notebook accessed via great_expectations suite new --interactive
. For more information, see our documentation on How to create and edit Expectations with instant feedback from a sample Batch of data. You can also create Expectation Suites using a profiler to automatically create expectations based on your data or manually using domain knowledge and without inspecting data directly.
- File-yaml
- File-python
- Dataframe-yaml
- Dataframe-python
First we create the suite and get a Validator
:
expectation_suite_name = "insert_your_expectation_suite_name_here"
context.create_expectation_suite(
expectation_suite_name=expectation_suite_name, overwrite_existing=True
)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
print(validator.head())
Then we use the Validator
to add a few Expectations:
validator.expect_column_values_to_not_be_null(column="passenger_count")
validator.expect_column_values_to_be_between(
column="congestion_surcharge", min_value=0, max_value=1000
)
Finally we save our Expectation Suite (all of the unique Expectation Configurations from each run of validator.expect_*
) to our Expectation Store:
validator.save_expectation_suite(discard_failed_expectations=False)
First we create the suite and get a Validator
:
expectation_suite_name = "insert_your_expectation_suite_name_here"
context.create_expectation_suite(
expectation_suite_name=expectation_suite_name, overwrite_existing=True
)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
print(validator.head())
Then we use the Validator
to add a few Expectations:
validator.expect_column_values_to_not_be_null(column="passenger_count")
validator.expect_column_values_to_be_between(
column="congestion_surcharge", min_value=0, max_value=1000
)
Finally we save our Expectation Suite (all of the unique Expectation Configurations from each run of validator.expect_*
) to our Expectation Store:
column="congestion_surcharge", min_value=0, max_value=1000
First we create the suite and get a Validator
:
expectation_suite_name = "insert_your_expectation_suite_name_here"
context.create_expectation_suite(
expectation_suite_name=expectation_suite_name, overwrite_existing=True
)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
print(validator.head())
Then we use the Validator
to add a few Expectations:
validator.expect_column_values_to_not_be_null(column="passenger_count")
validator.expect_column_values_to_be_between(
column="congestion_surcharge", min_value=0, max_value=1000
)
Finally we save our Expectation Suite (all of the unique Expectation Configurations from each run of validator.expect_*
) to our Expectation Store:
validator.save_expectation_suite(discard_failed_expectations=False)
First we create the suite and get a Validator
:
expectation_suite_name = "insert_your_expectation_suite_name_here"
context.create_expectation_suite(
expectation_suite_name=expectation_suite_name, overwrite_existing=True
)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
print(validator.head())
Then we use the Validator
to add a few Expectations:
validator.expect_column_values_to_not_be_null(column="passenger_count")
validator.expect_column_values_to_be_between(
column="congestion_surcharge", min_value=0, max_value=1000
)
Finally we save our Expectation Suite (all of the unique Expectation Configurations from each run of validator.expect_*
) to our Expectation Store:
validator.save_expectation_suite(discard_failed_expectations=False)
6. Validate your data
- File-yaml
- File-python
- Dataframe-yaml
- Dataframe-python
Here we will create and store a Checkpoint for our batch, which we can use to Validate and run post-validation actions. Check out our docs on "Validating your data" for more info on how to customize your Checkpoints.
First we create the Checkpoint configuration mirroring our batch_request
configuration above and using the Expectation Suite we created:
my_checkpoint_name = "insert_your_checkpoint_name_here"
checkpoint_config = f"""
name: {my_checkpoint_name}
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S-my-run-name-template"
validations:
- batch_request:
datasource_name: insert_your_datasource_name_here
data_connector_name: insert_your_data_connector_name_here
data_asset_name: yellow_tripdata
data_connector_query:
index: -1
batch_spec_passthrough:
reader_method: csv
reader_options:
header: True
expectation_suite_name: {expectation_suite_name}
"""
Then we test our syntax using test_yaml_config
:
my_checkpoint = context.test_yaml_config(checkpoint_config)
If all is well, we add the Checkpoint:
context.add_checkpoint(**yaml.load(checkpoint_config))
Finally we run the Checkpoint:
checkpoint_result = context.run_checkpoint(
checkpoint_name=my_checkpoint_name,
)
Here we will create and store a Checkpoint for our batch, which we can use to Validate and run post-validation actions. Check out our docs on "Validating your data" for more info on how to customize your Checkpoints.
First we create the Checkpoint configuration mirroring our batch_request
configuration above and using the Expectation Suite we created:
my_checkpoint_name = "insert_your_checkpoint_name_here"
checkpoint_config = {
"name": my_checkpoint_name,
"config_version": 1.0,
"class_name": "SimpleCheckpoint",
"run_name_template": "%Y%m%d-%H%M%S-my-run-name-template",
"validations": [
{
"batch_request": {
"datasource_name": "insert_your_datasource_name_here",
"data_connector_name": "insert_your_data_connector_name_here",
"data_asset_name": "yellow_tripdata",
"data_connector_query": {
"index": -1,
},
"batch_spec_passthrough": {
"reader_method": "csv",
"reader_options": {
"header": True,
},
},
},
"expectation_suite_name": expectation_suite_name,
}
],
}
Then we test our syntax using test_yaml_config
:
my_checkpoint = context.test_yaml_config(yaml.dump(checkpoint_config))
If all is well, we add the Checkpoint:
context.add_checkpoint(**checkpoint_config)
Finally we run the Checkpoint:
checkpoint_result = context.run_checkpoint(
checkpoint_name=my_checkpoint_name,
)
Here we will create and store a Checkpoint with no defined validations, then pass in our dataframe at runtime.
First we create the Checkpoint configuration:
my_checkpoint_name = "insert_your_checkpoint_name_here"
my_checkpoint_config = f"""
name: {my_checkpoint_name}
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S-my-run-name-template"
"""
Then we test our syntax using test_yaml_config
:
my_checkpoint = context.test_yaml_config(my_checkpoint_config)
Note that we get a message that the Checkpoint contains no validations. This is OK because we will pass them in at runtime, as we can see below when we call context.run_checkpoint()
.
If all is well, we add the Checkpoint:
context.add_checkpoint(**yaml.load(my_checkpoint_config))
Finally we run it with a validation defined using the Batch Request containing a reference to our dataframe and our Expectation Suite name:
checkpoint_result = context.run_checkpoint(
checkpoint_name=my_checkpoint_name,
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name,
}
],
)
Here we will create and store a Checkpoint with no defined validations, then pass in our dataframe at runtime.
First we create the Checkpoint configuration:
my_checkpoint_name = "insert_your_checkpoint_name_here"
checkpoint_config = {
"name": my_checkpoint_name,
"config_version": 1.0,
"class_name": "SimpleCheckpoint",
"run_name_template": "%Y%m%d-%H%M%S-my-run-name-template",
}
Then we test our syntax using test_yaml_config
:
my_checkpoint = context.test_yaml_config(yaml.dump(checkpoint_config))
Note that we get a message that the Checkpoint contains no validations. This is OK because we will pass them in at runtime, as we can see below when we call context.run_checkpoint()
.
If all is well, we add the Checkpoint:
context.add_checkpoint(**checkpoint_config)
Finally we run it with a validation defined using the Batch Request containing a reference to our dataframe and our Expectation Suite name:
checkpoint_result = context.run_checkpoint(
checkpoint_name=my_checkpoint_name,
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name,
}
],
)
7. Build and view Data Docs
Since we used a SimpleCheckpoint
, our Checkpoint already contained an UpdateDataDocsAction
which rendered our Data Docs from the validation we just ran. That means our Data Docs store will contain a new rendered validation result.
How do I customize these actions?
Also, to see the full Checkpoint configuration, you can run: print(my_checkpoint.get_substituted_config().to_yaml_str())
Since we used DBFS for our Data Docs store, we need to download our data docs locally to view them. If you use a different store, you can host your data docs in a place where they can be accessed directly by your team. To learn more, see our documentation on Data Docs for other locations e.g. filesystem, s3, GCS, ABS.
Run the following Databricks CLI command to download your data docs (replacing the paths as appropriate), then open the local copy of index.html
to view your updated Data Docs:
databricks fs cp -r dbfs:/great_expectations/uncommitted/data_docs/local_site/ great_expectations/uncommitted/data_docs/local_site/
8. Congratulations!
You've successfully validated your data with Great Expectations using Databricks and viewed the resulting human-readable Data Docs. Check out our other guides for more customization options and happy validating!
View the full scripts used in this page on GitHub: