DATA & AIJAVA
12/01/2021 • Jan Eerdekens

How to add BigQuery support to Flyway and integrate with Dataflow

Flyway is a library that is for example used in Spring Boot to provide schema migration functionality. But... does Flyway support BigQuery? In this blog post, we work out 3 proof-of-concepts to add BigQuery support to Flyway and integrate with Google Dataflow!

I'm a Solution Engineer in the Data team at ACA Group. Our cloud platform of choice is Google Cloud Platform (GCP). We’re currently using a subset of the services that are available on GCP.

  • For batch/streaming data processing, we're using Apache Beam, or more specifically Google Dataflow, which is Google's managed version of Apache Beam.
  • Our database of choice to push the results of our data processing to is Google BigQuery.

beam logogoogle bigquery

Because our company has historically used Java as its language of choice (Python has recently gained some traction within our company and is something the Data team uses for writing Google Cloud Functions), we chose to write our Dataflow pipelines in Java (but Apache Beam also supports Python). As a company, we also have a lot of experience in writing enterprise applications in Java and frameworks like Spring Boot. So we're very used to automating our database schema evolution. This is a best practice that we like to keep and would like to apply to our data pipelines in Dataflow.

So we decided to go on a little adventure and see if we could find something out there that could solve this need for us.

the hobbit meme

Research

The initial approach was first to do some research and see what Google can unearth. When looking around for information/tools/libraries/frameworks on schema evolution & migration for BigQuery, we did find some options that we gave a deeper look:

  • BigQuery itself has automatic schema detection and has support for schema changes, but these are mostly things you have to do manually using the API, the bq util or SQL. While this works, there’s nothing orchestrating it. Everything would need to be done manually or scripted.
  • bq-schema: a tool that does mostly what we want, but that has the disadvantage that it is written in Python and would be difficult to integrate unless we would also switch our Apache Beam/Dataflow pipelines to Python. Which is a possibility I’m not immediately writing off, but which is one to save for later if no other good solutions are found.
  • bigquery_migration: a tool that is similar to the Python one above, but covers less of our requirements and because it is made in Ruby would be even more difficult to integrate.
  • BQconvert: only helpful for actually migrating an existing database schema to BigQuery and so not at all suited for what we want to achieve.
  • Dataflow BigQuery Schema Evolution & Automated Schema Evolution for BigQuery: more of a set of ideas and inspirations than an actual solution for our needs.
  • Schema evolution in streaming Dataflow jobs and BigQuery tables: fascinating blog post series, but I couldn’t find any actual code repository for it.

While some of these options do cover some or most of our requirements, there wasn’t one that really was an ideal match.

A couple of these options were also mentioned in a Stackoverflow post that also contained a reference to Flyway… and the term Flyway rings a bell!

Flyway is a library that is for example used in Spring Boot to provide schema migration functionality and that based on previous experience should in theory cover all our requirements. Leaving only one big question: does Flyway have BigQuery support?

At the time I started looking into the whole Dataflow/BigQuery schema migration question, there was no official Flyway BigQuery support. In the meantime, non-certified beta support has been added. Via the aforementioned Stackoverflow post, I did however find an issue in the Flyway GitHub repository about adding BigQuery support to Flyway. In that issue, I found a mention to a branch in a forked repository that should add some sort of BigQuery support to Flyway.

We were already familiar with Flyway due to our Spring Boot experience, and we’ve found some Flyway code that might actually add BigQuery support to Flyway. Time to do some proof of concepts, which will hopefully answer a bunch of questions:

  • Can we get the BigQuery Flyway branch to work?
  • If Flyway does work correctly with BigQuery
    • Can we do all the necessary migrations like creating a table, adding data to a table, modifying a table schema, etc…?
    • Can we integrate it all into Dataflow?
  • If it integrates into Dataflow, does it actually run all the test migrations correctly? 

Proof of concept 1

The first proof of concept was to take the code from the forked repo as-is, clone it and try to get a simple migration to work against a BigQuery table in a dataset of a GCP test project. 

There are 3 ways to run/use Flyway:

  • Command line
  • Maven/Gradle plugin
  • Java API

Because we want to integrate Flyway support into our Java based Dataflow pipelines and also because our Jenkins/Terraform based deploy currently isn’t well suited for the command line or Maven/Gradle options, we first looked at just calling the Flyway API. This was done by just adding a simple Java class to the cloned repository branch and adding a main method. In this main method we needed to do a couple of things:

  • Create a JDBC datasource that is able to connect to BigQuery in a given GCP project.
  • Configure Flyway to use this datasource.
  • Run the Flyway migrate command and see if it finds and executes our SQL migration.

So the first thing we need to set up for a data source is a BigQuery JDBC driver. Luckily, the Google BigQuery documentation covers this. On this page is a link to a free download of the Google BigQuery Simba Data Connector made by Magnitude. Downloading the driver from this page will get you a ZIP file that contains the actual JDBC driver JAR file, GoogleBigQueryJDBC42.jar, but also all its dependencies.

In my case, I only added this driver JAR to our company’s Maven repository, because most of the other driver dependencies are already available in public Maven repositories. It's quite the chore to check them all and upload the missing ones or the ones with differing versions.

For this first POC it was enough to add the following dependencies to the pom.xml of the project we cloned (the versions are only indicative for when I tested it, but can be replaced with newer ones):

  • com.simba:bigquery-driver:1.2.4.1007
  • com.google.cloud:google-cloud-bigquery:1.132.1
  • com.google.cloud:google-cloud-bigquerystorage:1.21.1
  • org.apache.avro:avro:1.8.2

With these dependencies in place, we can then get the code below to work if you set the GOOGLE_APPLICATION_CREDENTIALS environment variable and point it to a service account credentials JSON file (which is needed to make the OAuthType=3 authentication mode work) and replace the <GCP project ID> and <a dataset ID> placeholders.

package org.flywaydb.core;

import com.simba.googlebigquery.jdbc42.DataSource;
import org.flywaydb.core.Flyway;

public class BQTest {

  public static void main(String[] args) {
     DataSource dataSource = new DataSource();
          dataSource.setURL("jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=<GCP project ID>;OAuthType=3");

     Flyway flyway = Flyway.configure()
           .createSchemas(false)
           .defaultSchema("<GCP project ID>.<a dataset ID>")
           .schemas("<GCP project ID>.<a dataset ID>")
           .dataSource(dataSource)
           .baselineOnMigrate(true)
           .load();

     flyway.migrate();
  }
}

I then also added an SQL migration file to my src/main/resources/db/migration directory and executed the code and to my surprise Flyway was trying to talk to my BigQuery. There was however one small issue with the cloned Flyway BigQuery code that needed to be fixed. The INSERT statement, in the BigQueryDatabase#getInsertStatement method, that Flyway uses to add migrations to its flyway_schema_history table failed for 2 reasons:

  • Type issues with INT64 columns that could be solved with explicit casting: CAST(? AS INT64).
  • Having to provide an explicit manual default, CURRENT_TIMESTAMP(), for timestamp columns.

After fixing the INSERT statement, I was able to see Flyway work correctly with BigQuery and verify that it could do all the migration actions that we defined. I even managed to get mixed SQL & Java migrations to work (using the Java BigQuery API to do things that can’t be expressed in SQL). There was only 1 surprise: adding data to a table can’t be done in the same SQL file that you create the table in. Those kinds of actions can’t be mixed in the same file.

The output below is similar to what I got, but is from a more recent attempt with the current Flyway 8.x that has BigQuery Beta support:

Nov 18, 2021 11:43:30 AM org.flywaydb.core.internal.license.VersionPrinter info
INFO: Flyway Community Edition 8.0.3 by Redgate
Nov 18, 2021 11:43:30 AM org.flywaydb.core.internal.database.base.BaseDatabaseType info
INFO: Database: jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=<GCP project ID>;OAuthType=3 (Google BigQuery 2.0)
Nov 18, 2021 11:43:30 AM org.flywaydb.core.internal.database.base.BaseDatabaseType warn
WARNING: Google BigQuery 2.0 does not support setting the schema for the current session. Default schema will NOT be changed to <GCP project ID>.<a dataset ID> !
Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info
INFO: Join the GCP BigQuery beta via https://rd.gt/3fut40f
Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info
INFO: 
Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info
INFO: Experiencing performance issues while using GCP BigQuery?
Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info
INFO: Find out how Flyway Teams improves performance with batching at https://rd.gt/3CWAuTb
Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info
INFO: 
Nov 18, 2021 11:43:48 AM org.flywaydb.database.bigquery.BigQueryDatabase info
INFO: Google BigQuery databases have a 10 GB database size limit in Flyway Community Edition.
You have used 0 GB / 10 GB
Consider upgrading to Flyway Teams Edition for unlimited usage: https://rd.gt/3CWAuTb
Nov 18, 2021 11:43:51 AM org.flywaydb.core.internal.command.DbValidate info
INFO: Successfully validated 1 migration (execution time 00:02.091s)
Nov 18, 2021 11:44:03 AM org.flywaydb.core.internal.schemahistory.JdbcTableSchemaHistory info
INFO: Creating Schema History table `<GCP project ID>.<a dataset ID>`.`flyway_schema_history` with baseline ...
Nov 18, 2021 11:44:09 AM org.flywaydb.core.internal.command.DbBaseline info
INFO: Successfully baselined schema with version: 1
Nov 18, 2021 11:44:19 AM org.flywaydb.core.internal.command.DbMigrate info
INFO: Current version of schema `<GCP project ID>.<a dataset ID>`: 1
Nov 18, 2021 11:44:19 AM org.flywaydb.core.internal.command.DbMigrate info
INFO: Migrating schema `<GCP project ID>.test_dataset` to version "1.0001 - Test migration" [non-transactional]
Nov 18, 2021 11:44:47 AM org.flywaydb.core.internal.command.DbMigrate info
INFO: Successfully applied 1 migration to schema `<GCP project ID>.<a dataset ID>`, now at version v1.0001 (execution time 00:37.604s)

Proof of concept 2

The previous POC leaves us now with a new problem to solve: get this code working inside a Google Dataflow project. Taking inspiration from Spring Boot, which runs Flyway migrations during application startup, I had to find something in Beam/Dataflow that is similar and allows us to run arbitrary code during startup.

A first option that I discovered and investigated was a custom DataflowRunnerHooks implementation. While trying this out, I quickly discovered that the moment this is triggered is completely wrong for what we want to achieve as it is already executing while building the Dataflow code using the mvn compile exec:java command. Because we're building a common Dataflow artifact that is deployed to all environments and gets injected with runtime variables, triggering our custom Flyway code at this time doesn't achieve what we want.

So after looking around some more I found the JvmInitializer interface. This immediately looked more promising and a quick implementation showed that it was indeed usable, but that it does have a number of quirks/gotchas that we’ll cover in more detail in the lessons learned section.

package be.planetsizebrain;

import com.simba.googlebigquery.jdbc42.DataSource;
import com.simba.googlebigquery.support.exceptions.ErrorException;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.sdk.harness.JvmInitializer;
import org.apache.beam.sdk.options.PipelineOptions;
import org.flywaydb.core.Flyway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlywayJvmInitializer implements JvmInitializer {

  private final Logger LOGGER = LoggerFactory.getLogger(FlywayJvmInitializer.class);

  @Override
  public void beforeProcessing(PipelineOptions options) {
     LOGGER.info("Running flyway JVM initializer...");

     try {
        DataflowWorkerHarnessOptions harnessOptions = options.as(DataflowWorkerHarnessOptions.class);
        executeFlyway(harnessOptions);
     } catch (Exception e) {
        LOGGER.error("Flyway migrations failed!", e);

        throw new RuntimeException("Unexpected problem during beforeProcessing phase of JVM initializer", e);
     } finally {
        LOGGER.info("Finished running flyway JVM initializer.");
     }
  }

  private void executeFlyway(DataflowWorkerHarnessOptions harnessOptions) throws ErrorException {
     Flyway flyway = initializeFlywayClient(harnessOptions);

     LOGGER.info("Running flyway migrations...");

     flyway.migrate();

     LOGGER.info("Finished flyway migrations");
  }

  private Flyway initializeFlywayClient(DataflowWorkerHarnessOptions harnessOptions) throws ErrorException {
     DataSource dataSource = createBigQueryDataSource(harnessOptions);

     return Flyway.configure()
           .createSchemas(false)
           .defaultSchema("FLYWAY")
           .schemas("FLYWAY")
           .dataSource(dataSource)
           .failOnMissingLocations(true)
           .locations("classpath:db/migration")
           .ignoreFutureMigrations(true)
           .load();
  }

  private DataSource createBigQueryDataSource(DataflowWorkerHarnessOptions options) throws ErrorException {
     DataSource dataSource = new DataSource();
     dataSource.setURL("jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=" + options.getProject() + ";OAuthType=3");
     dataSource.setTimeout(180);
     dataSource.setLogLevel("INFO");
     dataSource.setMaxResults(10000);

     return dataSource;
  }
}

When adding this code to a Dataflow project, there is one more thing needed to actually make it work. The JvmInitializer system works via the Java Service Provider Interface mechanism. This means we need to create a file called org.apache.beam.sdk.harness.JvmInitializer in src/main/resources/META-INF/services that contains the FQCN of our JvmInitializer implementation.

When running a Dataflow pipeline, we can see the following logging (here again with the output of a more recent attempt with the Flyway version that has Beta support for BigQuery):

2021-11-08 11:26:50.593 CET "Loading pipeline options from /var/opt/google/dataflow/pipeline_options.json"
2021-11-08 11:26:50.624 CET "Worker harness starting with: {...}"
2021-11-08 11:26:53.565 CET "Running flyway JVM initializer..."
2021-11-08 11:26:54.111 CET "Running flyway migrations..."
2021-11-08 11:26:54.322 CET "[Simba][JDSI](20260) Cannot access file to use for logging: CANNOT_CREATE_LOGGING_PATH. Switching to default logging output."
2021-11-08 11:26:54.325 CET "Nov 08 10:26:54.323  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: SDK Version: 10.1.20.1161"
2021-11-08 11:26:54.330 CET "Nov 08 10:26:54.330  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Driver Version: 01.02.19.1023"
2021-11-08 11:26:54.332 CET "Nov 08 10:26:54.332  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Name: Java HotSpot(TM) 64-Bit Server VM"
2021-11-08 11:26:54.332 CET "Nov 08 10:26:54.332  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Specification Version: 1.8"
2021-11-08 11:26:54.333 CET "Nov 08 10:26:54.333  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Implementation Version: 25.151-b12"
2021-11-08 11:26:54.335 CET "Nov 08 10:26:54.335  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Vendor: Oracle Corporation"
2021-11-08 11:26:54.335 CET "Nov 08 10:26:54.335  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Operating System Name: Linux"
2021-11-08 11:26:54.339 CET "Nov 08 10:26:54.339  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Operating System Version: 5.4.120+"
2021-11-08 11:26:54.340 CET "Nov 08 10:26:54.340  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Operating System Architecture: amd64"
2021-11-08 11:26:54.340 CET "Nov 08 10:26:54.340  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Locale Name: en_US"
2021-11-08 11:26:54.340 CET "Nov 08 10:26:54.340  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Default Charset Encoding: US-ASCII"
2021-11-08 11:26:54.358 CET "[Simba][JDSI](20260) Cannot access file to use for logging: CANNOT_CREATE_LOGGING_PATH. Switching to default logging output."
2021-11-08 11:26:54.474 CET "SQLWarning: reason([Simba][BigQueryJDBCDriver](1000019) Invalid connection property value for MetaDataFetchThreadCount, value overridden to 32.) SQLState(01S02)"
2021-11-08 11:26:57.024 CET "Nov 08 10:26:57.023 WARN 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.doConnect: [Simba][BigQueryJDBCDriver](1000019) Invalid connection property value for MetaDataFetchThreadCount, value overridden to 32."
2021-11-08 11:26:57.032 CET "SQLWarning: reason([Simba][BigQueryJDBCDriver](1000019) Invalid connection property value for MetaDataFetchThreadCount, value overridden to 32.) SQLState(01S02)"
2021-11-08 11:26:57.037 CET "Nov 08 10:26:57.037  1 com.simba.googlebigquery.jdbc.common.SConnection.SConnection: Driver version is: 01.02.19.1023"
2021-11-08 11:26:57.038 CET "Nov 08 10:26:57.038  1 com.simba.googlebigquery.jdbc.common.SConnection.SConnection: Datasource version is: 01.02.19.1023"
2021-11-08 11:26:57.144 CET "Flyway Community Edition 8.0.3 by Redgate"
2021-11-08 11:26:57.146 CET "Database: jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=amfori-data-acc;OAuthType=3; (Google BigQuery 2.0)"
2021-11-08 11:26:57.159 CET "Google BigQuery 2.0 does not support setting the schema for the current session. Default schema will NOT be changed to FLYWAY !"
2021-11-08 11:26:57.160 CET "Join the GCP BigQuery beta via https://rd.gt/3fut40f"
2021-11-08 11:26:57.160 CET "Experiencing performance issues while using GCP BigQuery?"
2021-11-08 11:26:57.163 CET "Find out how Flyway Teams improves performance with batching at https://rd.gt/3CWAuTb"
2021-11-08 11:27:55.640 CET "Google BigQuery databases have a 10 GB database size limit in Flyway Community Edition. You have used 0 GB / 10 GB Consider upgrading to Flyway Teams Edition for unlimited usage: https://rd.gt/3CWAuTb"
2021-11-08 11:27:56.285 CET "Nov 08 10:27:56.285  1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API"
2021-11-08 11:27:57.240 CET "Nov 08 10:27:57.240  1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API"
2021-11-08 11:27:58.463 CET "Nov 08 10:27:58.463  1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API"
2021-11-08 11:27:58.612 CET "Successfully validated 68 migrations (execution time 00:02.154s)"
2021-11-08 11:28:08.063 CET "Creating Schema History table `FLYWAY`.`flyway_schema_history` ..."
2021-11-08 11:28:20.046 CET "Current version of schema `FLYWAY`: << Empty Schema >>"
2021-11-08 11:28:20.049 CET "Migrating schema `FLYWAY` to version "1.0001 - Test migration" [non-transactional]"
2021-11-08 11:28:21.100 CET "Nov 08 10:28:21.100  1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API"
...
2021-11-08 11:28:25.724 CET "Nov 08 10:28:25.724  1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API"
"Nov 08 10:28:45.547  1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API"
...
"Successfully applied 68 migrations to schema `FLYWAY`, now at version v8.0022 (execution time 27:00.604s)"

Proof of concept 3

When starting to write the actual blog post, I checked out the Flyway Github repo again and spotted an interesting new module in their Maven multi-module project: flyway-gcp-bigquery (and also one for GCP Spanner). Looking at Maven Central it looks like they started to release beta versions of the BigQuery support somewhere in July 2021.

So I decided to check it out and see if I could remove the forked PR code from my codebase and replace it with this beta version dependency:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
...
    <properties>
	<bigquery-driver.version>1.2.19.1023</bigquery-driver.version>
	<flyway.version>8.0.3</flyway.version>
    </properties>
...
    <dependency>
	<groupId>org.flywaydb</groupId>
	<artifactId>flyway-core</artifactId>
	<version>${flyway.version}</version>
    </dependency>
    <dependency>
	<groupId>org.flywaydb</groupId>
	<artifactId>flyway-gcp-bigquery</artifactId>
	<version>${flyway.version}-beta</version>
    </dependency>
...
</project>

After removing the code, adding the dependencies above (while also upgrading Flyway from 7.x to 8.x), recompiling and deploying, I was still able to run all the migrations successfully against an empty BigQuery environment.

Lessons learned

The Simba BigQuery driver

The driver itself (as far as I can tell, the only JDBC BigQuery driver there is) does what it is supposed to do, but when it comes to logging it is a bit of a hot mess. Things I had to do to get the driver’s logging in Dataflow under some sort of control include:

  • Adding a bunch of dependencies that try to redirect stuff to SLF4J and use those in my FlywayJvmInitializer constructor
    • org.apache.logging.log4j:log4j-iostreams
    • uk.org.lidalia:sysout-over-slf4j
  • Debug the driver to find out why I still got stuff on System.out
  • Overwrite the com.simba.googlebigquery.dsi.core.impl.StreamHandler class to force more logging to SLF4J
public FlywayJvmInitializer() {
  PrintWriter logWriter = IoBuilder.forLogger().setLevel(Level.INFO).buildPrintWriter();
  DriverManager.setLogWriter(logWriter);
  SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
  LogFactory.setLogCreator(new Slf4jLogCreator());
}
package com.simba.googlebigquery.dsi.core.impl;

import com.simba.googlebigquery.dsi.core.interfaces.ILogHandler;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.io.IoBuilder;

import java.io.PrintWriter;

public class StreamHandler implements ILogHandler {

  private final PrintWriter m_logWriter;

  public StreamHandler() {
     this.m_logWriter = IoBuilder.forLogger("StreamHandler").setLevel(Level.INFO).buildPrintWriter();
  }

  public void writeLog(String var1) throws Exception {
     this.m_logWriter.println(var1);
     this.m_logWriter.flush();
  }
}

No local BigQuery is annoying

There is no way to run something locally on your development machine that can be used to validate BigQuery behaviour. So no BigQuery docker image or emulator means that in order to test Flyway migrations against BigQuery you will actually need either a separate Google project to test against or use prefixed datasets in an existing Google project.

Due to certain limitations we had to go for the prefixed dataset approach, but managed to get it to work pretty transparently by using Dataflow runtime ValueProviders, the Flyway placeholder functionality and a custom utility that makes the dataset creation/deletion process easier.

BigQuery Time Travel is your friend

BigQuery has a very interesting feature called Time Travel, which comes in very handy when Flyway migrations fail. Especially for the community edition of Flyway, which doesn’t have “undo” functionality, Time Travel is the easiest way to restore your database to how it was before the migration.

I’m even wondering if you could somehow build “undo” functionality using BigQuery’s Time Travel and Flyway’s “callbacks” (the ones that are available in the community version)?

Time Travel also comes in handy because BigQuery has quotas on a lot of things. Manually reverting changes via SQL ALTER TABLE statements for example quickly makes you run into these.

Dataflow worker scaling gives weird results

We first had every Dataflow pipeline using the JvmInitializer to keep the database schema up to date, but noticed that sometimes rows in the Flyway history table were duplicated (or more). As it turns out, every Dataflow worker that gets started by a pipeline goes through JVM initialization. Sometimes, these are started close enough to each other that migrations get run multiple times. Usually Flyway tries to use some sort of locking to solve this, but in the cloned code this mechanism wasn’t available for BigQuery. It seems some sort of locking is available in the 8.x Beta for this, but I haven’t been to test if this works yet.

To solve this issue, we made running the JvmInitializer configurable and turned it off by default for all pipelines and created a specific dummy Flyway pipeline for which we turned it on and which runs before all other batch pipelines.

The Flyway BigQuery migration process is kinda slow

Worker initialization takes about 2 minutes before the worker actually starts doing stuff and we see Flyway kicking into action. Afterwards, it also seems that every migration file takes at least 30 seconds to run (sometimes more, depending on the migration and table contents). From the logging it looks like this is partially due to how the SQL is being run: a BigQuery job for which you need to listen for the results.

Luckily, due to the previous issue/solution we’re only running it once every day for one dummy pipeline and not the rest of our pipelines. So the only time it is actually slow is when you’re testing and running the full set of migrations starting from an empty environment.

You will also need to set your Flyway timeout to a value that is long enough for bigger table manipulations to succeed and not cause a timeout. We’re currently working with a value of 180 seconds.

Mixing SQL and Java migrations is perfectly possible

For all the things that you want to do with BigQuery in the context of a migration that aren’t supported by BigQuery’s SQL implementation, you can fall back on Flyway’s Java migrations. In a Java migration, you can easily use the BigQuery Java API to do everything that the API allows you to do.

package org.flywaydb.core.internal;

import com.google.auth.Credentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.*;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;

public class V1__Java_Based_Migration extends BaseJavaMigration {

  @Override
  public void migrate(Context context) throws Exception {
     Credentials credentials = ServiceAccountCredentials.getApplicationDefault();
    
     BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder()
           .setProjectId("my-gcp-project-id")
           .setCredentials(credentials)
           .build();
    
     BigQuery service = bigQueryOptions.getService();
     TableId tableId = TableId.of("test", "test");
     Table table = service.getTable(tableId);
     TableId tableCopyId = TableId.of("test", "test_copy");
     Job job = table.copy(tableCopyId);
    
     job.waitFor();
  }

  @Override
  public Integer getChecksum() {
     return 1;
  }
}

Runtime configuration in a JvmInitializer

In the end, we created a more advanced JvmInitializer that allows us to turn Flyway migrations/repair/baselining on/off, dynamic prefixing of datasets, and so on. For this we of course need to provide Dataflow pipeline options and because we’re also building a pipeline artifact (JSON + JARs) in a central bucket that is used to start jobs in multiple environments these options need to be runtime options. This is where we ran into an issue with Dataflow’s option mechanism, especially if you want to use the required/default mechanism. As it turns out, this mechanism doesn’t really work like you’d expect and defaults seem to get lost when you don’t provide a value for an option, but try to access it in the JvmInitializer.

The solution to this was found when looking at the Dataflow worker logs. In these logs, we could see a JSON being logged that contains most of the option info we need. This JSON is available under the sdk_pipeline_options_file environment variable on a worker. Reading this value and parsing it allows us to kind of get a working custom options object. Together with using reflection to look at the annotations and their contents, we got it to work well enough for our purposes.

Jan Eerdekens