How to connect to a BigQuery database
This guide will help you connect to data in a BigQuery database. This will allow you to ValidateThe act of applying an Expectation Suite to a Batch. and explore your data.
Prerequisites: This how-to guide assumes you have:
- Completed the Getting Started Tutorial
- Have a working installation of Great Expectations
- Have access to data in a BigQuery database
- Followed the Google Cloud Library guide for authentication
Steps
1. Choose how to run the code in this guide
Get an environment to run the code in this guide. Please choose an option below.
- CLI + filesystem
- No CLI + filesystem
- No CLI + no filesystem
If you use the Great Expectations CLICommand Line Interface, run this command to automatically generate a pre-configured Jupyter Notebook. Then you can follow along in the YAML-based workflow below:
great_expectations datasource new
If you use Great Expectations in an environment that has filesystem access, and prefer not to use the CLICommand Line Interface, run the code in this guide in a notebook or other Python script.
If you use Great Expectations in an environment that has no filesystem (such as Databricks or AWS EMR), run the code in this guide in that system's preferred way.
2. Install required dependencies
First, install the necessary dependencies for Great Expectations to connect to your BigQuery database by running the following in your terminal:
pip install sqlalchemy-bigquery
3. Add credentials
Great Expectations provides multiple methods of using credentials for accessing databases. Options include using a file not checked into source control, environment variables, and using a cloud secret manager. Please read the article How to Configure Credentials for instructions on alternatives.
For this guide we will use a connection_string
like this:
bigquery://<GCP_PROJECT>/<BIGQUERY_DATASET>
4. Instantiate your project's DataContext
Import these necessary packages and modules.
import os
from ruamel import yaml
import great_expectations as ge
from great_expectations.core.batch import BatchRequest, RuntimeBatchRequest
Load your DataContext into memory using the get_context()
method.
context = ge.get_context()
5. Configure your Datasource
- YAML
- Python
Put your connection string in this template:
datasource_yaml = f"""
name: my_bigquery_datasource
class_name: Datasource
execution_engine:
class_name: SqlAlchemyExecutionEngine
connection_string: bigquery://<GCP_PROJECT_NAME>/<BIGQUERY_DATASET>
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
default_inferred_data_connector_name:
class_name: InferredAssetSqlDataConnector
include_schema_name: true
"""
Run this code to test your configuration.
context.test_yaml_config(datasource_yaml)
Put your connection string in this template:
datasource_config = {
"name": "my_bigquery_datasource",
"class_name": "Datasource",
"execution_engine": {
"class_name": "SqlAlchemyExecutionEngine",
"connection_string": "bigquery://<GCP_PROJECT_NAME>/<BIGQUERY_DATASET>",
},
"data_connectors": {
"default_runtime_data_connector_name": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": ["default_identifier_name"],
},
"default_inferred_data_connector_name": {
"class_name": "InferredAssetSqlDataConnector",
"include_schema_name": True,
},
},
}
Run this code to test your configuration.
context.test_yaml_config(yaml.dump(datasource_config))
You will see your database tables listed as Available data_asset_names
in the output of test_yaml_config()
.
Feel free to adjust your configuration and re-run test_yaml_config
as needed.
6. Save the Datasource configuration to your DataContext
Save the configuration into your DataContext
by using the add_datasource()
function.
- YAML
- Python
context.add_datasource(**yaml.load(datasource_yaml))
context.add_datasource(**datasource_config)
7. Test your new Datasource
Verify your new DatasourceProvides a standard API for accessing and interacting with data from a wide variety of source systems. by loading data from it into a ValidatorUsed to run an Expectation Suite against data. using a BatchRequest
.
- Using a SQL query
- Using a table name
Here is an example of loading data by specifying a SQL query.
Currently BigQuery does not allow for the creation of temporary tables as the result of a query. As a workaround, Great Expectations allows you to pass in a string to use as a table name. It will then use this string to create a named permanent table as a "temporary" table, with the name passed in as a batch_spec_passthrough
parameter. The table will be created in the location specified in the connection_string
of your execution_engine
. In the following example we are using a table named ge_temp
.
batch_request = RuntimeBatchRequest(
datasource_name="my_bigquery_datasource",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="default_name", # this can be anything that identifies this data
runtime_parameters={"query": "SELECT * from demo.taxi_data LIMIT 10"},
batch_identifiers={"default_identifier_name": "default_identifier"},
batch_spec_passthrough={
"bigquery_temp_table": "ge_temp"
}, # this is the name of the table you would like to use a 'temp_table'
)
context.create_expectation_suite(
expectation_suite_name="test_suite", overwrite_existing=True
)
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="test_suite"
)
print(validator.head())
Here is an example of loading data by specifying an existing table name.
Currently BigQuery does not allow for the creation of temporary tables as the result of a query. As a workaround, Great Expectations allows for a named permanent table to be used as a "temporary" table, with the name passed in as a batch_spec_passthrough
parameter. In the following example we are using a table named ge_temp
.
batch_request = BatchRequest(
datasource_name="my_bigquery_datasource",
data_connector_name="default_inferred_data_connector_name",
data_asset_name="demo.taxi_data", # this is the name of the table you want to retrieve
batch_spec_passthrough={
"bigquery_temp_table": "ge_temp"
}, # this is the name of the table you would like to use a 'temp_table'
)
context.create_expectation_suite(
expectation_suite_name="test_suite", overwrite_existing=True
)
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="test_suite"
)
print(validator.head())
🚀🚀 Congratulations! 🚀🚀 You successfully connected Great Expectations with your data.
Additional Notes
To view the full scripts used in this page, see them on GitHub:
Next Steps
Now that you've connected to your data, you'll want to work on these core skills: