Having worked with Google Cloud composer for quite long, when someone says pipeline my brain automatically goes to one and only one thing – DAG. With such wide variety of pluggable options available, it feels like a manna. It is kind of similar to all the other tools of yore Informatica, SSIS etc but less GUI and more extensible. It is only in my current job I am discovering how much more google has to offer out of the box and much cheaper options and I am loving the autonomy that I am provided with to come up with solution.
That’s not to say it’s wild wild west out here with everyone left to do what they want. There are some established patterns as listed below which I have worked on –
- Airflow DAGs running dbt models – Very costly. Essentially, what is happening here is all the jobs do is run Kubernetes Pod operator which in turn run dbt commands. Effective if there are lot of ad-hoc jobs getting run across the project. Am planning to put a complete post about it, especially about dbt (which I am REALLY excited about).
- Databricks – Mostly for ML models but there are quite good number of legacy jobs doing the ETL workflows. Again, a very pricey option for simple workflows. I loved working on it and seeing the flexibility it provides and ease of debugging.
- Airflow DAGs – Regular airflow jobs with all elements of ETL (very few jobs) with plug and play operators.
- Cloud Functions – Using Cloud Functions trigger pattern to deploy BQ objects (main meat of this post)
In addition to the above there are some more such as Cloud Run with Cloud Scheduler / Workflows, Vertex AI using dbt. I haven’t really worked on them yet but would be discovering and learning about it.
Cloud Functions as Google states just write your code and let Google handle all the operational infrastructure. With release of version 2, there are literally hundreds of ways with which you can orchestrate the Cloud Functions and integrate with multitude of actions from the entire suite of Google Cloud Platform.
Without further ado, let’s get to the main topic – Automating BigQuery Scripts Execution with Cloud Functions and Secret Manager. Here is a simple pipeline set-up to get a overall view.
Pipeline flow at high level –
- Third party systems pushes the data to Cloud Storage.
- Cloud Functions gets triggered.
- As part of set-up Cloud Function accesses the Secret Manager API.
- Credentials are authenticated at Cloud Function.
- BigQuery scripts are executed.
Steps (6)-(7) involves setting up criteria in Cloud Logging which you can then send an alert monitoring log using Cloud Monitoring to send mails in case of failures.
In so far the projects that I have worked, use of Cloud Functions have been –
- Triggering BigQuery SQL scripts using ServiceAccount credentials
- Parsing incoming files from a cloud storage bucket and doing transformations (csv/json/excel to bigquery or vice-versa)
- Triggering DAG on basis of data drop.
In almost all the cases, I haven’t seen cloud functions being checked in. This could be because the SA credentials are part of the source project i.e. included as additional json file or in case of DAG trigger it involves embedding token id in the main definition i.e. sensitive.
Here is where Secret Manager comes to the rescue. As the name implies, it’s a service that you can use to just store all the variables in a tightly locked container and use name for further referencing.
Cloud Functions can make use of Secret Manager and remove the hassle of storing sensitive data within the code. The only problem I felt is the official documentation is bit sparse on this topic. You can mount the secret as a volume or expose the secret as an environment variable and it is very straightforward as can be seen below.
There are three ways with which you can access the secret variable as listed below –
Method 1 – Using the secret manager API, bit long and clunky code requiring multiple statements.
def get_credentials_via_api():
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_name = f"projects/{PROJECT_ID}/secrets/{CREDENTIALS_SECRET}/versions/latest"
secret_response = secret_manager_client.access_secret_version(name=secret_name)
secret_payload = secret_response.payload.data.decode("UTF-8")
credentials = Credentials.from_service_account_info(json.loads(secret_payload),scopes=SCOPES)
return credentials
Method 2 – Using the secret manager stored as mounted variable – My preferred method
def get_credentials_via_service_account_env():
secret = os.getenv(f'{CREDENTIALS_SECRET}')
credentials = Credentials.from_service_account_info(json.loads(secret), scopes=SCOPES)
return credentials
GUI Would look like this

Method 3 – Using the secret manager stored as environment variable
def get_credentials_via_service_account_mnt():
secret_location = f"/secrets/{CREDENTIALS_SECRET}"
with open(secret_location, "r") as file:
secret = file.read()
credentials = Credentials.from_service_account_info(json.loads(secret), scopes=SCOPES)
return credentials
GUI Would look like this

This example showed how to set Secrets as environment variables or mounts in a Google Cloud Function. If you are dealing with tokenid’s for dag based trigger, just get them through methods mentioned above but without the need to decode it via json and rather get them as plain text strings.
Full code is available at the following link. It also contains a Azure pipeline for establishing a CI/CD pipeline.

