Connect to SQL database source data
Use the information provided here to connect to source data stored in SQL databases. Great Expectations (GX) uses SQLAlchemy to connect to SQL source data, and most of the SQL dialects supported by SQLAlchemy are also supported by GX. For more information about the SQL dialects supported by SQLAlchemy, see Dialects.
- SQL
- PostgreSQL
- SQLite
- Snowflake
- Databricks SQL
- BigQuery SQL
SQL
Connect GX to a SQL database to access source data.
Prerequisites
- An installation of GX set up to work with SQL
- Source data stored in a SQL database
Import GX and instantiate a Data Context
Run the following Python code to import GX and instantiate a Data Context:
import great_expectations as gx
context = gx.get_context()
Determine your connection string
GX supports numerous SQL source data systems. However, most SQL dialects have their own specifications for defining a connection string. See the dialect documentation to determine the connection string for your SQL database.
The following are some of the connection strings that are available for different SQL dialects:
- AWS Athena:
awsathena+rest://@athena.<REGION>.amazonaws.com/<DATABASE>?s3_staging_dir=<S3_PATH>
- BigQuery:
bigquery://<GCP_PROJECT>/<BIGQUERY_DATASET>?credentials_path=/path/to/your/credentials.json
- MSSQL:
mssql+pyodbc://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<DATABASE>?driver=<DRIVER>&charset=utf&autocommit=true
- MySQL:
mysql+pymysql://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<DATABASE>
- PostgreSQL:
postgresql+psycopg2://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<DATABASE>
- Redshift:
postgresql+psycopg2://<USER_NAME>:<PASSWORD>@<HOST>:<PORT>/<DATABASE>?sslmode=<SSLMODE>
- Snowflake:
snowflake://<USER_NAME>:<PASSWORD>@<ACCOUNT_NAME>/<DATABASE_NAME>/<SCHEMA_NAME>?warehouse=<WAREHOUSE_NAME>&role=<ROLE_NAME>&application=great_expectations_oss
- SQLite:
sqlite:///<PATH_TO_DB_FILE>
- Trino:
trino://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<CATALOG>/<SCHEMA>
Run one of the connection strings in your preferred SQL dialect to store the connection string in the connection_string
variable with plain text credentials. The following code is an example of the PostgreSQL connection string format:
connection_string = "postgresql+psycopg2://username:my_password@localhost/test"
You can use environment variables or a key in config_variables.yml
to store connection string passwords. After you define your password, you reference it in your connection string similar to this example:
connection_string = (
"postgresql+psycopg2://<username>:${MY_PASSWORD}@<host>:<port>/<database>"
)
In the previous example MY_PASSWORD
is the name of the environment variable, or the key to the value in config_variables.yml
that corresponds to your password.
If you include a password as plain text in your connection string when you define your Data Source, GX automatically removes it, adds it to config_variables.yml
, and substitutes it in the Data Source saved configuration with a variable.
Create a SQL Data Source
Run the following Python code to create a SQL Data Source:
datasource = context.sources.add_sql(
name="my_datasource", connection_string=connection_string
)
PostgreSQL
Connect GX to a PostgreSQL database to access source data.
Prerequisites
- An installation of GX set up to work with PostgreSQL
- Source data stored in a PostgreSQL database
Import GX and instantiate a Data Context
Run the following Python code to import GX and instantiate a Data Context:
import great_expectations as gx
context = gx.get_context()
Determine your connection string
The following code examples use a PostgreSQL connection string. A PostgreSQL connection string connects GX to the PostgreSQL database.
The following code is an example of a PostgreSQL connection string format:
my_connection_string = (
"postgresql+psycopg2://<username>:<password>@<host>:<port>/<database>"
)
We recommend that database credentials be stored in the config_variables.yml
file, which is located in the uncommitted/
folder by default, and is not part of source control. The following lines add database credentials under the key db_creds
.
db_creds:
drivername: postgres
host: '<your_host_name>'
port: '<your_port>'
username: '<your_username>'
password: '<your_password>'
database: '<your_database_name>'
For additional options on configuring the config_variables.yml
file or additional environment variables, please see our guide on how to configure credentials.
Create a PostgreSQL Data Source
Run the following Python code to set the
name
andconnection_string
variables:datasource_name = "my_datasource"
my_connection_string = (
"postgresql+psycopg2://<username>:<password>@<host>:<port>/<database>"
)Run the following Python code to create a PostgreSQL Data Source:
datasource = context.sources.add_postgres(
name=datasource_name, connection_string=my_connection_string
)
Connect to a specific set of data with a Data Asset
To connect the Data Source to a specific set of data in the database, you define a Data Asset in the Data Source. A Data Source can contain multiple Data Assets. Each Data Asset acts as the interface between GX and the specific set of data it is configured for.
With SQL databases, you can use Table or Query Data Assets. The Table Data Asset connects GX to the data contained in a single table in the source database. The Query Data Asset connects GX to the data returned by a SQL query.
Although there isn't a maximum number of Data Assets you can define for a Data Source, you must create a single Data Asset to allow GX to retrieve data from your Data Source.
Connect a Data Asset to the data in a table (Optional)
Run the following Python code to identify the table to connect to with a Table Data Asset:
asset_name = "my_table_asset"
asset_table_name = "postgres_taxi_data"Run the following Python code to create the Data Asset:
table_asset = datasource.add_table_asset(name=asset_name, table_name=asset_table_name)
Connect a Data Asset to the data returned by a query (Optional)
Run the following Python code to define a Query Data Asset:
asset_name = "my_query_asset"
asset_query = "SELECT * from postgres_taxi_data"Run the following Python code to create the Data Asset:
query_asset = datasource.add_query_asset(name=asset_name, query=asset_query)
Connect to additional tables or queries (Optional)
Repeat the previous steps to add additional Data Assets.
SQLite
Connect GX to a SQLite database to access source data.
Prerequisites
- An installation of GX set up to work with SQLite
- Source data stored in a SQLite database
Import GX and instantiate a Data Context
Run the following Python code to import GX and instantiate a Data Context:
import great_expectations as gx
context = gx.get_context()
Determine your connection string
The following code examples use a SQLite connection string. A SQLite connection string connects GX to the SQLite database.
The following code is an example of a SQLite connection string format:
my_connection_string = "sqlite:///<path_to_db_file>"
Create a SQLite Data Source
Run the following Python code to set the
name
andconnection_string
variables:datasource_name = "my_datasource"
Run the following Python code to create a SQLite Data Source:
datasource = context.sources.add_sqlite(
name=datasource_name, connection_string=my_connection_string
)Using add_sql(...)
instead ofadd_sqlite(...)
The SQL Data Source created with
add_sql
can connect to data in a SQLite database. However,add_sqlite(...)
is the preferred method.SQLite stores datetime values as strings. Because of this, a general SQL Data Source sees datetime columns as string columns. A SQLite Data Source has additional handling in place for these fields, and also has additional error reporting for SQLite specific issues.
If you are working with SQLite source data, use
add_sqlite(...)
to create your Data Source.
Connect to the data in a table (Optional)
Run the following Python code to set the
asset_name
andasset_table_name
variables:asset_name = "my_asset"
asset_table_name = my_table_nameRun the following Python code to create the Data Asset:
table_asset = datasource.add_table_asset(name=asset_name, table_name=asset_table_name)
Connect to the data in a query (Optional)
Run the following Python code to define a Query Data Asset:
asset_name = "my_query_asset"
query = "SELECT * from yellow_tripdata_sample_2019_01"Run the following Python code to create the Data Asset:
query_asset = datasource.add_query_asset(name=asset_name, query=query)
Add additional tables or queries (Optional)
Repeat the previous steps to add additional Data Assets.
Snowflake
Connect GX to a Snowflake database to access source data.
Prerequisites
- An installation of GX set up to work with SQL
- Source data stored in a Snowflake database
Import GX and instantiate a Data Context
Run the following Python code to import GX and instantiate a Data Context:
import great_expectations as gx
context = gx.get_context()
Determine your connection string
The following code examples use a Snowflake connection string. A Snowflake connection string connects GX to the Snowflake database.
The following code is an example of a Snowflake connection string format:
my_connection_string = "snowflake://<USER_NAME>:<PASSWORD>@<ACCOUNT_NAME_OR_LOCATOR>/<DATABASE_NAME>/<SCHEMA_NAME>?warehouse=<WAREHOUSE_NAME>&role=<ROLE_NAME>"
Snowflake accepts both account names and account locators as valid account identifiers when constructing a connection string.
Account names uniquely identify an account within your organization and are the preferred method of account identification.
Account locators act in the same manner but are auto-generated by Snowflake based on the cloud platform and region used.
For more information on both methods, please visit Snowflake's official documentation on account identifiers
Create a Snowflake Data Source
Run the following Python code to set the
name
andconnection_string
variables:datasource_name = "my_snowflake_datasource"
Run the following Python code to create a Snowflake Data Source:
datasource = context.sources.add_snowflake(
name=datasource_name,
connection_string=my_connection_string, # Or alternatively, individual connection args
)
connection_string
Although a connection string is the standard way to yield a connection to a database, the Snowflake datasource supports individual connection arguments to be passed in as an alternative.
The following arguments are supported:
account
user
password
database
schema
warehouse
role
numpy
Passing these values as keyword args to add_snowflake
is functionally equivalent to passing in a connection_string
.
For more information, check out Snowflake's official documentation on the Snowflake SQLAlchemy toolkit.
Connect to the data in a table (Optional)
Run the following Python code to set the
asset_name
andasset_table_name
variables:asset_name = "my_asset"
asset_table_name = my_table_nameRun the following Python code to create the Data Asset:
table_asset = datasource.add_table_asset(name=asset_name, table_name=asset_table_name)
Connect to the data in a query (Optional)
Run the following Python code to define a Query Data Asset:
asset_name = "my_query_asset"
query = "SELECT * from yellow_tripdata_sample_2019_01"Run the following Python code to create the Data Asset:
query_asset = datasource.add_query_asset(name=asset_name, query=query)
Add additional tables or queries (Optional)
Repeat the previous steps to add additional Data Assets.
Databricks SQL
Connect GX to Databricks to access source data.
Prerequisites
- An installation of GX set up to work with SQL
- Source data stored in a Databricks cluster
Import GX and instantiate a Data Context
Run the following Python code to import GX and instantiate a Data Context:
import great_expectations as gx
context = gx.get_context()
Determine your connection string
The following code examples use a Databricks SQL connection string. A connection string connects GX to Databricks.
The following code is an example of a Databricks SQL connection string format:
my_connection_string = f"databricks://token:{token}@{host}:{port}/{database}?http_path={http_path}&catalog={catalog}&schema={schema}"
Create a Databricks SQL Data Source
Run the following Python code to set the
name
andconnection_string
variables:datasource_name = "my_databricks_sql_datasource"
Run the following Python code to create a Snowflake Data Source:
datasource = context.sources.add_databricks_sql(
name=datasource_name,
connection_string=my_connection_string,
)
Connect to the data in a table (Optional)
Run the following Python code to set the
asset_name
andasset_table_name
variables:asset_name = "my_asset"
asset_table_name = my_table_nameRun the following Python code to create the Data Asset:
table_asset = datasource.add_table_asset(name=asset_name, table_name=asset_table_name)
Connect to the data in a query (Optional)
Run the following Python code to define a Query Data Asset:
asset_name = "my_query_asset"
query = "SELECT * from yellow_tripdata_sample_2019_01"Run the following Python code to create the Data Asset:
query_asset = datasource.add_query_asset(name=asset_name, query=query)
Add additional tables or queries (Optional)
Repeat the previous steps to add additional Data Assets.
BigQuery SQL
Integrate GX with Google Cloud Platform (GCP).
The following scripts and configuration files are used in the examples:
The local GX configuration is located in the
great-expectations
GIT repository.The script to test the BigQuery configuration is located in gcp_deployment_patterns_file_bigquery.py.
The script to test the GCS configuration is located in gcp_deployment_patterns_file_gcs.py.
Prerequisites
- An installation of GX set up to work with SQL.
- Familiarity with Google Cloud Platform features and functionality.
- A GCP project with a running Google Cloud Storage container that is accessible from your region.
- Read/write access to a BigQuery database.
- Access to a GCP Service Account with permission to access and read objects in Google Cloud Storage.
When installing GX in Composer 1 and Composer 2 environments the following packages must be pinned:
[tornado]==6.2
[nbconvert]==6.4.5
[mistune]==0.8.4
GX recommends that you use the following services to integrate GX with GCP:
Google Cloud Composer (GCC) for managing workflow orchestration including validating your data. GCC is built on Apache Airflow.
BigQuery or files in Google Cloud Storage (GCS) as your Data SourceProvides a standard API for accessing and interacting with data from a wide variety of source systems..
GCS for storing metadata (Expectation SuitesA collection of verifiable assertions about data., Validation ResultsGenerated when data is Validated against an Expectation or Expectation Suite., Data DocsHuman readable documentation generated from Great Expectations metadata detailing Expectations, Validation Results, etc.).
Google App Engine (GAE) for hosting and controlling access to Data DocsHuman readable documentation generated from Great Expectations metadata detailing Expectations, Validation Results, etc..
The following diagram shows the recommended components for a GX deployment in GCP:
Upgrade your GX version (Optional)
Run the following code to upgrade your GX version:
pip install great-expectations --upgrade
Get DataContext
Run the following code to create a new Data ContextThe primary entry point for a Great Expectations deployment, with configurations and methods for all supporting components.:
import great_expectations as gx
context = gx.data_context.FileDataContext.create(full_path_to_project_directory)
The full_path_to_project_directory
parameter can be an empty directory where you intend to build your GX configuration.
Connect to GCP Metadata Stores
The code examples are located in the great-expectations
repository.
When specifying prefix
values for Metadata Stores in GCS, don't add a trailing slash /
. For example, prefix: my_prefix/
. When you add a trailing slash, an additional folder with the name /
is created and metadata is stored in the /
folder instead of my_prefix
.
Add an Expectations Store
By default, newly profiled Expectations are stored in JSON format in the expectations/
subdirectory of your great_expectations/
folder. A new Expectations Store can be configured by adding the following lines to your great_expectations.yml
file. Replace the project
, bucket
and prefix
with your values.
stores:
expectations_store:
class_name: ExpectationsStore
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: expectations/
expectations_store_name: expectations_store
To configure GX to use this new Expectations Store, expectations_GCS_store
, set the expectations_store_name
value in the great_expectations.yml
file.
stores:
expectations_GCS_store:
class_name: ExpectationsStore
store_backend:
class_name: TupleGCSStoreBackend
project: <your>
bucket: <your>
prefix: <your>
expectations_store_name: expectations_GCS_store
Add a Validations Store
By default, Validations are stored in JSON format in the uncommitted/validations/
subdirectory of your great_expectations/
folder. You can connfigure a new Validations Store by adding the following lines to your great_expectations.yml
file. Replace the project
, bucket
and prefix
with your values.
stores:
validations_store:
class_name: ValidationsStore
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: uncommitted/validations/
validations_store_name: validations_store
To configure GX to use the new validations_GCS_store
Validations Store, set the validations_store_name
value in the great_expectations.yml
file.
stores:
validations_GCS_store:
class_name: ValidationsStore
store_backend:
class_name: TupleGCSStoreBackend
project: <your>
bucket: <your>
prefix: <your>
validations_store_name: validations_GCS_store
Add a Data Docs Store
To host and share Data Docs on GCS, see Host and share Data Docs.
After you have hosted and shared Data Docs, your great-expectations.yml
contains the following configuration below data_docs_sites
:
data_docs_sites:
local_site:
class_name: SiteBuilder
show_how_to_buttons: true
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: uncommitted/data_docs/local_site/
site_index_builder:
class_name: DefaultSiteIndexBuilder
gs_site: # this is a user-selected name - you may select your own
class_name: SiteBuilder
store_backend:
class_name: TupleGCSStoreBackend
project: <your>
bucket: <your>
site_index_builder:
class_name: DefaultSiteIndexBuilder
Run the following command in the gcloud CLI to view the deployed DataDocs site:
gcloud app browse
The URL to your app appears and opens in a new browser window. You can view the index page of your Data Docs site.
Connect to source data
Connect to source data stored on a GCS or .
- Data in GCS
- Data in BigQuery
Run the following code to add the name of your GCS bucket to the add_pandas_gcs
function:
datasource = context.sources.add_pandas_gcs(
name="gcs_datasource", bucket_or_name="test_docs_data"
)
In the example, you've added a Data Source that connects to data in GCS using a Pandas dataframe. The name of the new datasource is gcs_datasource
and it refers to a GCS bucket named test_docs_data
.
For more information about configuring a Data Source, see How to connect to data on GCS using Pandas.
Tables that are created by BigQuery queries are automatically set to expire after one day.
Run the following code to create a Data Source that connects to data in BigQuery:
datasource = context.sources.add_or_update_sql(
name="my_bigquery_datasource",
connection_string="bigquery://<gcp_project_name>/<bigquery_dataset>",
)
In the example, you created a Data Source named my_bigquery_datasource
, using the add_or_update_sql
method and passed it in a connection string.
To configure the BigQuery Data Source, see How to connect to a BigQuery database.
Create Assets
- Data in GCS
- Data in BigQuery
Use the add_csv_asset
function to add a CSV Asset
to your Datasource
.
Configure the prefix
and batching_regex
. The prefix
is the path to the GCS bucket where the files are located. batching_regex
is a regular expression that indicates which files should be treated as Batches in the Asset, and how to identify them. For example:
batching_regex = r"yellow_tripdata_sample_(?P<year>\d{4})-(?P<month>\d{2})\.csv"
prefix = "data/taxi_yellow_tripdata_samples/"
In the example, the pattern r"data/taxi_yellow_tripdata_samples/yellow_tripdata_sample_(?P<year>\d{4})-(?P<month>\d{2})\.csv"
builds a Batch for the following files in the GCS bucket:
test_docs_data/data/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2019-01.csv
test_docs_data/data/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2019-02.csv
test_docs_data/data/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2019-03.csv
The batching_regex
pattern matches the four digits of the year portion and assigns it to the year
domain, and then matches the two digits of the month portion and assigns it to the month
domain.
Run the following code to use the add_csv_asset
function to add an Asset
named csv_taxi_gcs_asset
to your Data Source:
data_asset = datasource.add_csv_asset(
name="csv_taxi_gcs_asset", batching_regex=batching_regex, gcs_prefix=prefix
)
You can add a BigQuery Asset
into your Datasource
as a table asset or query asset.
In the following example, a table Asset
named my_table_asset
is built by naming the table in your BigQuery Database.
table_asset = datasource.add_table_asset(name="my_table_asset", table_name="taxi_data")
In the following example, a query Asset
named my_query_asset
is built by submitting a query to the taxi_data
table.
query_asset = datasource.add_query_asset(
name="my_query_asset", query="SELECT * from taxi_data"
)
Get a Batch and Create Expectation Suite
- Data in GCS
- Data in BigQuery
Use the
add_or_update_expectation_suite
method on your Data Context to create an Expectation Suite:context.add_or_update_expectation_suite(expectation_suite_name="test_gcs_suite")
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="test_gcs_suite"
)Use the
Validator
method to run Expectations on the Batch and automatically add them to the Expectation Suite.validator.expect_column_values_to_not_be_null(column="passenger_count")
validator.expect_column_values_to_be_between(
column="congestion_surcharge", min_value=-3, max_value=1000
)In this example, you're adding
expect_column_values_to_not_be_null
andexpect_column_values_to_be_between
. You can replace thepassenger_count
andcongestion_surcharge
test data columns with your own data columns.Run the following code to save the Expectation Suite:
validator.save_expectation_suite(discard_failed_expectations=False)
To configure the RuntimeBatchRequest and learn how you can load data by specifying a GCS path to a single CSV, see How to connect to data on GCS using Pandas.
Use the
table_asset
you created previously to build aBatchRequest
:request = table_asset.build_batch_request()
Use the
add_or_update_expectation_suite
method on your Data Context to create an Expectation Suite and get aValidator
:context.add_or_update_expectation_suite(expectation_suite_name="test_bigquery_suite")
validator = context.get_validator(
batch_request=request, expectation_suite_name="test_bigquery_suite"
)Use the
Validator
to run expectations on the batch and automatically add them to the Expectation Suite:validator.expect_column_values_to_not_be_null(column="passenger_count")
validator.expect_column_values_to_be_between(
column="congestion_surcharge", min_value=0, max_value=1000
)In this example, you're adding
expect_column_values_to_not_be_null
andexpect_column_values_to_be_between
. You can replace thepassenger_count
andcongestion_surcharge
test data columns with your own data columns.Run the following code to save the Expectation Suite containing the two Expectations:
validator.save_expectation_suite(discard_failed_expectations=False)
To configure the BatchRequest and learn how you can load data by specifying a table name, see How to connect to a BigQuery database.
Build and run a Checkpoint
- Data in GCS
- Data in BigQuery
Run the following code to add the
gcs_checkpoint
Checkpoint to the Data Context:checkpoint = context.add_or_update_checkpoint(
name="gcs_checkpoint",
validations=[
{"batch_request": batch_request, "expectation_suite_name": "test_gcs_suite"}
],
)In this example, you're using the
BatchRequest
andExpectationSuite
names that you used when you created your Validator.Run the Checkpoint by calling
checkpoint.run()
:checkpoint_result = checkpoint.run()
Run the following code to add the
bigquery_checkpoint
Checkpoint to the Data Context:checkpoint = context.add_or_update_checkpoint(
name="bigquery_checkpoint",
validations=[
{"batch_request": request, "expectation_suite_name": "test_bigquery_suite"}
],
)In this example, you're using the
BatchRequest
andExpectationSuite
names that you used when you created your Validator.Run the Checkpoint by calling
checkpoint.run()
:checkpoint_result = checkpoint.run()
Migrate your local configuration to Cloud Composer
Migrate your local GX configuration to a Cloud Composer environment to automate the workflow. You can use one of the following methods to run GX in Cloud Composer or Airflow:
In this example, you'll use the bash operator
to run the Checkpoint. A video overview of this process is also available in this video.
Create and Configure a GCP Service Account
To create a GCP Service Account, see Service accounts overview.
To run GX in a Cloud Composer environment, the following privileges are required for your Service Account:
Composer Worker
Logs Viewer
Logs Writer
Storage Object Creator
Storage Object Viewer
If you are accessing data in BigQuery, the following privileges are required for your Service Account:
BigQuery Data Editor
BigQuery Job User
BigQuery Read Session User
Create a Cloud Composer environment
See Create Cloud Composer environments.
Install Great Expectations in Cloud Composer
You can use the Composer web Console (recommended), gcloud
, or a REST query to install Python dependencies in Cloud Composer. To install great-expectations
in Cloud Composer, see Installing Python dependencies in Google Cloud. If you are connecting to data in BigQuery, make sure sqlalchemy-bigquery
is also installed in your Cloud Composer environment.
If you run into trouble when you install GX in Cloud Composer, see Troubleshooting PyPI package installation.
Move your local configuration to Cloud Composer
Cloud Composer uses Cloud Storage to store Apache Airflow DAGs (also known as workflows), with each Environment having an associated Cloud Storage bucket. Typically, the bucket name uses this pattern: [region]-[composer environment name]-[UUID]-bucket
.
To migrate your local configuration, you can move the local great_expectations/
folder to the Cloud Storage bucket where Composer can access the configuration.
Open the Environments page in the Cloud Console and then click the environment name to open the Environment details page. The name of the Cloud Storage bucket is located to the right of the DAGs folder on the Configuration tab.
This is the folder where DAGs are stored. You can access it from the Airflow worker nodes at:
/home/airflow/gcsfuse/dags
. The location where you'll uploadgreat_expectations/
is one level above the/dags
folder.Upload the local
great_expectations/
folder by dragging and dropping it into the window, usinggsutil cp
, or by clicking theUpload Folder
button.After the
great_expectations/
folder is uploaded to the Cloud Storage bucket, it is mapped to the Airflow instances in your Cloud Composer and is accessible from the Airflow Worker nodes at:/home/airflow/gcsfuse/great_expectations
.
Write the DAG and add it to Cloud Composer
- Data in GCS
- Data in BigQuery
Run the following code to create a DAG with a single node (
t1
) that runs aBashOperator
:from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
"start_date": airflow.utils.dates.days_ago(0),
"retries": 1,
"retry_delay": timedelta(days=1),
}
dag = DAG(
"GX_checkpoint_run",
default_args=default_args,
description="running GX checkpoint",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=5),
)
# priority_weight has type int in Airflow DB, uses the maximum.
t1 = BashOperator(
task_id="checkpoint_run",
bash_command="(cd /home/airflow/gcsfuse/great_expectations/ ; great_expectations checkpoint run gcs_checkpoint ) ",
dag=dag,
depends_on_past=False,
priority_weight=2**31 - 1,
)The DAG is stored in a file named:
ge_checkpoint_gcs.py
The
BashOperator
changes the directory to/home/airflow/gcsfuse/great_expectations
, where you uploaded your local configuration.Run the following command in the gcloud CLI to run the Checkpoint locally:
great_expectations checkpoint run gcs_checkpoint
To add the DAG to Cloud Composer, you move ge_checkpoint_gcs.py
to the environment's DAGs folder in Cloud Storage. For more information about adding or updating DAGs, see Add or update a DAG.
Open the Environments page in the Cloud Console and then click the environment name to open the Environment details page.
On the Configuration tab, click the Cloud Storage bucket name located to the right of the DAGs folder.
Upload the local copy of the DAG.
Run the following code to create a DAG with a single node (
t1
) that runs aBashOperator
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
"start_date": airflow.utils.dates.days_ago(0),
"retries": 1,
"retry_delay": timedelta(days=1),
}
dag = DAG(
"GX_checkpoint_run",
default_args=default_args,
description="running GX checkpoint",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=5),
)
# priority_weight has type int in Airflow DB, uses the maximum.
t1 = BashOperator(
task_id="checkpoint_run",
bash_command="(cd /home/airflow/gcsfuse/great_expectations/ ; great_expectations checkpoint run bigquery_checkpoint ) ",
dag=dag,
depends_on_past=False,
priority_weight=2**31 - 1,
)The DAG is stored in a file named:
ge_checkpoint_bigquery.py
The
BashOperator
changes the directory to/home/airflow/gcsfuse/great_expectations
, where you uploaded your local configuration.Run the following command in the gcloud CLI to run the Checkpoint locally::
great_expectations checkpoint run bigquery_checkpoint
To add the DAG to Cloud Composer, you move ge_checkpoint_bigquery.py
to the environment's DAGs folder in Cloud Storage. For more information about adding or updating DAGs, see Add or update a DAG.
Open the Environments page in the Cloud Console and then click the environment name to open the Environment details page.
On the Configuration tab, click the Cloud Storage bucket name located to the right of the DAGs folder.
Upload the local copy of the DAG.
Run the DAG and the Checkpoint
Use one of the following methods to trigger the DAG:
To trigger the DAG manually:
Open the Environments page in the Cloud Console and then click the environment name to open the Environment details page.
In the Airflow webserver column, click the Airflow link for your environment to open the Airflow web interface for your Cloud Composer environment.
Click Trigger Dag to run your DAG configuration.
When the DAG run is successful, the
Success
status appears on the DAGs page of the Airflow Web UI. You can also check that new Data Docs have been generated by accessing the URL to yourgcloud
app.