DATA & AIJAVA
12/01/2021 • Jan Eerdekens

Hoe voeg je BigQuery ondersteuning en Dataflow integratie toe aan Flyway?

Flyway is een library die bijvoorbeeld gebruikt wordt in Spring Boot om functionaliteit voor schemamigratie te voorzien. Maar... ondersteunt Flyway wel BigQuery? In deze blogpost werken we 3 proof-of-concepts uit om BigQuery ondersteuning toe te voegen aan Flyway, samen met integratie met Google Dataflow!

Hallo, ik ben een Solution Engineer uit het Datateam van ACA Group. Voor ons cloudplatform kozen wij voor Google Cloud Platform (GCP). Dit zijn de diensten vanuit GCP die wij gebruiken en die voor deze blogpost belangrijk zijn:

  • Voor batch/streaming data processing gebruiken we Apache Beam. Specifiek gebruiken we Google Dataflow, een managed versie van Apache Beam.
  • Als database om resultaten van onze data processing naartoe te pushen, gebruiken we Google BigQuery.

Omdat we als bedrijf historisch gezien altijd in Java heeft gewerkt (hoewel Python recent aan een opmars is begonnen binnen ACA en we Python ook in het Datateam gebruiken om Google Cloud Functions te schrijven), hebben we er voor gekozen om ook onze Dataflow pijplijnen in Java te schrijven. Apache Beam ondersteunt trouwens ook wel Python. Bovendien hebben we als bedrijf erg veel ervaring in het schrijven van bedrijfsapplicaties in Java en frameworks zoals Spring Boot. We zijn het dus gewoon om onze database schema evoluties te automatiseren, een best practice die we ook op onze data pijplijnen in Dataflow willen toepassen.

We besloten dus om een uitstapje te maken om te kijken of we iets konden vinden waarmee we dit konden bereiken.

Onderzoek

Als eerste hebben we wat onderzoek gedaan om te kijken wat enkele Google zoekopdrachten ons konden vertellen. Op zoek naar informatie, tools, libraries of frameworks over schema evolutie en migratie voor Bigquery, kwamen we enkele opties tegen die we verder onderzochten:

  • BigQuery heeft zelf automatische schemadetectie en ondersteunt schemaveranderingen, maar dit zijn vooral zaken die je manueel via de API, de bq util of SQL moet doen. Het werkt prima, maar dus niet automatisch: alles moet manueel gebeuren of via scripting.
  • bq-schema is een tool die voor het overgrote deel doet wat wij willen, maar heeft het nadeel (voor ons) dat deze in Python is geschreven en dus moeilijk te integreren is, tenzij we ook onze Apache Beam/Dataflow pijplijnen naar Python veranderen. Dat is een mogelijkheid die we niet meteen afschrijven, maar best tot het einde houden indien we geen betere oplossingen vinden.
  • bigquery_migration is een tool die vergelijkbaar is met degene hierboven, maar dekt onze vereisten minder goed. Bovendien is deze nog moeilijker om te integreren omdat de tool in Ruby is gemaakt.
  • BQconvert helpt alleen bij het daadwerkelijk migreren van bestaande database schema's naar BigQuery en is dus niet geschikt voor wat wij willen bereiken.
  • Dataflow BigQuery Schema Evolution & Automated Schema Evolution for BigQuery lijken meer op een groep van ideeën en inspiraties dan een daadwerkelijke oplossing voor onze noden.
  • Schema evolution in streaming Dataflow jobs and BigQuery tables is een fantastische series blogposts, maar ik kon er helaas geen code repository van vinden.

Hoewel sommige van deze opties aan enkele of zelfs de meeste van onze vereisten voldoen, is het duidelijk dat er niet echt een ideale match was.

Een paar van deze opties werden ook genoemd in een Stackoverflow post die ook refereerde naar Flyway… en dat deed een belletje rinkelen!

Flyway is een library die bijvoorbeeld in Spring Boot gebruikt wordt om functionaliteit voor schemamigratie te voorzien. Gebaseerd op voorgaande ervaringen, zou Flyway in theorie al onze vereisten moeten dekken. Rest er nog één grote vraag: ondersteunt Flyway BigQuery?

Toen ik nog bezig was met het bovenstaande onderzoek, bestond er geen officiële ondersteuning voor BigQuery vanuit Flyway. Ondertussen is er niet-gecertificeerde beta-ondersteuning toegevoegd. Via de eerder genoemde Stackoverflow post vond ik een issue in de Flyway GitHub repository over het toevoegen van ondersteuning van BigQuery voor Flyway. In dat issue vond ik een verwijzing naar een branch in een forked repository die een soort van BigQuery ondersteuning aan Flyway toevoegt.

We kenden Flyway al goed dankzij onze ervaring met Spring Boot, en we vonden dus wat Flyway code die voor BigQuery ondersteuning zou kunnen zorgen. Tijd dus om wat proof-of-concepts uit te werken, waarmee we hopelijk deze vragen kunnen beantwoorden:

  • Krijgen we de BigQuery Flyway branch aan de praat?
  • Als Flyway correct werkt met BigQuery,
    • krijgen we alle noodzakelijke migraties voor elkaar, zoals het aanmaken van een tabel, data toevoegen aan de tabel, aanpassen van een tabelschema, enz.?
    • kunnen we alles in Dataflow integreren?
  • Als alles in Dataflow integreert, zullen alle testmigraties dan juist lopen?

Proof-of-concept 1

Het eerste proof-of-concept bestaat eruit om de forked repo in de huidige staat over te nemen, te klonen en te proberen om een simpele migratie te laten werken tegen een BigQuery tabel in een dataset van een GCP-testproject.

Er zijn 3 manieren om Flyway te gebruiken:

    • via de command line,
    • via Maven/Gradle plug-in,

  • of via de Java API.

Omdat we Flyway ondersteuning willen integreren in onze op Java gebaseerde Dataflow pijplijnen en ook omdat onze op Jenkins/Terraform gebaseerde deploy niet erg geschikt is voor de opties via de command line of Maven/Gradle, bekeken we of we gewoon de Flyway API konden aanspreken. Dit hebben we gedaan door eenvoudigweg een Java class toe te voegen aan de gekloonde repository branch, samen met een algemene aanpak. Deze algemene aanpak bestaat uit:

  • Een JDBC databron creëren die kan verbinden met BigQuery in een opgegeven GCP-project.
  • Flyway configureren om deze databron te gebruiken.
  • Het Flyway migratiecommando uitvoeren en verifiëren of deze onze SQL-migratie vindt en uitvoert.

Het eerste dat we moeten configureren voor een databron is een BigQuery JDBC driver. Gelukkig voor ons dekt de Google BigQuery documentatie dit. Op deze pagina staat een link naar een gratis download van de Google BigQuery Simba Data Connector gemaakt door Magnitude. Wanneer je de driver van deze pagina downloadt, krijg je een .zip-bestand dat het JDBC .jar bevat (GoogleBigQueryJDBC42.jar), maar ook alle bijbehorende dependencies.

Ik voegde enkel de .jar toe aan de Maven repository van ons bedrijf, omdat de meeste andere driver dependencies al beschikbaar zijn in publieke Maven repositories. Het is immers nogal een werkje om ze allemaal te checken en de ontbrekende dependendies of die met een andere versie te gaan uploaden.

Voor deze eerste POC was het voldoende om de volgende dependendies toe te voegen aan de pom.xml van het project dat we gekloond hebben. (De versies van onderstaande dependencies zijn slecht indicatief voor het tijdstip waarop ik deze heb getest, je kan deze vervangen door nieuwere versies.)

  • com.simba:bigquery-driver:1.2.4.1007
  • com.google.cloud:google-cloud-bigquery:1.132.1
  • com.google.cloud:google-cloud-bigquerystorage:1.21.1
  • org.apache.avro:avro:1.8.2

Nu deze dependencies op de juiste plek staan, kunnen we onderstaande code werkend krijgen als we de GOOGLE_APPLICATION_CREDENTIALS omgeving variabel maken en naar een JSON bestand verwijzen met de service account credentials. Dit hebben we nodig om de OAuthType=3 authenticatiemodus te laten werken. Vervang ook de <GCP project ID> en <a dataset ID> placeholders.

package org.flywaydb.core;

import com.simba.googlebigquery.jdbc42.DataSource;
import org.flywaydb.core.Flyway;

public class BQTest {

  public static void main(String[] args) {
     DataSource dataSource = new DataSource();
          dataSource.setURL("jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=<GCP project ID>;OAuthType=3");

     Flyway flyway = Flyway.configure()
           .createSchemas(false)
           .defaultSchema("<GCP project ID>.<a dataset ID>")
           .schemas("<GCP project ID>.<a dataset ID>")
           .dataSource(dataSource)
           .baselineOnMigrate(true)
           .load();

     flyway.migrate();
  }
}

Ik heb ook een SGL-migratiebestand toegevoegd aan mijn src/main/resources/db/migration directory en de code uitgevoerd. Tot mijn grote verrassing probeerde Flyway inderdaad te praten met mijn BigQuery. Er was echter een klein probleempje met de gekloonde Flyway BigQuery code dat opgelost moest worden. Het INSERT statement in de BigQueryDatabase#getInsertStatement methode die Flyway gebruikt om migraties toe te voegen aan flyway_schema_history table faalde omwille van twee redenen:

  • Type issues met INT64 kolommen die opgelost konden worden met explicit casting: CAST(? AS INT64).
  • Het moeten voorzien van een expliciete manuele default, CURRENT_TIMESTAMP(), voor timestamp kolommen.

Na het herstellen van het INSERT statement was Flyway in staat om correct te werken met BigQuery. Ik kon verifiëren dat deze combinatie inderdaad alle migraties kon uitvoeren die we eerder hebben gedefinieerd. Ik kreeg zelfs gemengde SQL & Java migraties werkend door gebruikt te maken van de Java BigQuery API, om zo zaken te doen die niet kunnen worden uitgedrukt in SQL. Er bleef nog één verrassing over: data kan niet worden toegevoegd aan een tabel in hetzelfde SQL-bestand waar je de tabel in creëert. Die acties kunnen dus niet gemengd worden in hetzelfde bestand.

De output hieronder is vergelijkbaar aan wat ik kreeg, maar is het resultaat van een meer recente poging met de huidige Flyway 8.x die BigQuery Beta ondersteuning levert:

Nov 18, 2021 11:43:30 AM org.flywaydb.core.internal.license.VersionPrinter info
INFO: Flyway Community Edition 8.0.3 by Redgate
Nov 18, 2021 11:43:30 AM org.flywaydb.core.internal.database.base.BaseDatabaseType info
INFO: Database: jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=<GCP project ID>;OAuthType=3 (Google BigQuery 2.0)
Nov 18, 2021 11:43:30 AM org.flywaydb.core.internal.database.base.BaseDatabaseType warn
WARNING: Google BigQuery 2.0 does not support setting the schema for the current session. Default schema will NOT be changed to <GCP project ID>.<a dataset ID> !
Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info
INFO: Join the GCP BigQuery beta via https://rd.gt/3fut40f
Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info
INFO: 
Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info
INFO: Experiencing performance issues while using GCP BigQuery?
Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info
INFO: Find out how Flyway Teams improves performance with batching at https://rd.gt/3CWAuTb
Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info
INFO: 
Nov 18, 2021 11:43:48 AM org.flywaydb.database.bigquery.BigQueryDatabase info
INFO: Google BigQuery databases have a 10 GB database size limit in Flyway Community Edition.
You have used 0 GB / 10 GB
Consider upgrading to Flyway Teams Edition for unlimited usage: https://rd.gt/3CWAuTb
Nov 18, 2021 11:43:51 AM org.flywaydb.core.internal.command.DbValidate info
INFO: Successfully validated 1 migration (execution time 00:02.091s)
Nov 18, 2021 11:44:03 AM org.flywaydb.core.internal.schemahistory.JdbcTableSchemaHistory info
INFO: Creating Schema History table `<GCP project ID>.<a dataset ID>`.`flyway_schema_history` with baseline ...
Nov 18, 2021 11:44:09 AM org.flywaydb.core.internal.command.DbBaseline info
INFO: Successfully baselined schema with version: 1
Nov 18, 2021 11:44:19 AM org.flywaydb.core.internal.command.DbMigrate info
INFO: Current version of schema `<GCP project ID>.<a dataset ID>`: 1
Nov 18, 2021 11:44:19 AM org.flywaydb.core.internal.command.DbMigrate info
INFO: Migrating schema `<GCP project ID>.test_dataset` to version "1.0001 - Test migration" [non-transactional]
Nov 18, 2021 11:44:47 AM org.flywaydb.core.internal.command.DbMigrate info
INFO: Successfully applied 1 migration to schema `<GCP project ID>.<a dataset ID>`, now at version v1.0001 (execution time 00:37.604s)

Proof-of-concept 2

Het vorige POC laat ons toe om aan een nieuw probleem te werken: deze code werkend te krijgen binnen een Google Dataflow project. Dankzij inspiratie via Spring Boot, waarbij Flyway migraties draaien vanaf het opstarten van een applicatie, moest ik iets in Beam/Dataflow vinden dat vergelijkbaar is en ons toeliet om willekeurige code uit te voeren tijdens het opstarten.

Een eerste optie is een custom DataflowRunnerHooks implementatie. Toen ik deze optie uitprobeerde, kwam ik er al snel achter dat het moment dat deze getriggerd wordt volledig fout is voor wat wij willen bereiken. De code wordt immers al uitgevoerd terwijl de Dataflow code gebouwd wordt gebruikmakend van het mvn compile exec:java commando. Omdat we een veel voorkomend Dataflow artifact bouwen die op alle omgeving wordt gedeployed en geïnjecteerd wordt met runtime variabelen, is het triggeren van onze custom Flyway code op dit moment niet ideaal om te bereiken wat we willen.

Na nog wat meer onderzoek vond ik de JvmInitializer interface. Dit zag er meteen veelbelovend uit. Een snelle implementatie liet zien dat dit inderdaad bruikbaar was, maar dat er enkele zaken zijn om voor uit te kijken. Die kan je lezen in de lessons learned onderaan deze blogpost.

package be.planetsizebrain;

import com.simba.googlebigquery.jdbc42.DataSource;
import com.simba.googlebigquery.support.exceptions.ErrorException;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.sdk.harness.JvmInitializer;
import org.apache.beam.sdk.options.PipelineOptions;
import org.flywaydb.core.Flyway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlywayJvmInitializer implements JvmInitializer {

  private final Logger LOGGER = LoggerFactory.getLogger(FlywayJvmInitializer.class);

  @Override
  public void beforeProcessing(PipelineOptions options) {
     LOGGER.info("Running flyway JVM initializer...");

     try {
        DataflowWorkerHarnessOptions harnessOptions = options.as(DataflowWorkerHarnessOptions.class);
        executeFlyway(harnessOptions);
     } catch (Exception e) {
        LOGGER.error("Flyway migrations failed!", e);

        throw new RuntimeException("Unexpected problem during beforeProcessing phase of JVM initializer", e);
     } finally {
        LOGGER.info("Finished running flyway JVM initializer.");
     }
  }

  private void executeFlyway(DataflowWorkerHarnessOptions harnessOptions) throws ErrorException {
     Flyway flyway = initializeFlywayClient(harnessOptions);

     LOGGER.info("Running flyway migrations...");

     flyway.migrate();

     LOGGER.info("Finished flyway migrations");
  }

  private Flyway initializeFlywayClient(DataflowWorkerHarnessOptions harnessOptions) throws ErrorException {
     DataSource dataSource = createBigQueryDataSource(harnessOptions);

     return Flyway.configure()
           .createSchemas(false)
           .defaultSchema("FLYWAY")
           .schemas("FLYWAY")
           .dataSource(dataSource)
           .failOnMissingLocations(true)
           .locations("classpath:db/migration")
           .ignoreFutureMigrations(true)
           .load();
  }

  private DataSource createBigQueryDataSource(DataflowWorkerHarnessOptions options) throws ErrorException {
     DataSource dataSource = new DataSource();
     dataSource.setURL("jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=" + options.getProject() + ";OAuthType=3");
     dataSource.setTimeout(180);
     dataSource.setLogLevel("INFO");
     dataSource.setMaxResults(10000);

     return dataSource;
  }
}

Wanneer we deze code toevoegen aan een Dataflow project, is er nog één ding nodig om alles daadwerkelijk te laten draaien. HetJvmInitializer systeem werkt via het Java Service Provider Interface mechanisme. Dat betekent dat we een bestand moeten creëren met de naam org.apache.beam.sdk.harness.JvmInitializer in src/main/resources/META-INF/services dat de FQCN van onze JvmInitializer implementatie bevat.

Als we de Dataflow pipeline laten runnen, zien we de volgende logging. (Opnieuw geldt hier dat dit de output is van een recentere poging met de Flyway versie die beta-ondersteuning biedt voor BigQuery.)

2021-11-08 11:26:50.593 CET "Loading pipeline options from /var/opt/google/dataflow/pipeline_options.json"
2021-11-08 11:26:50.624 CET "Worker harness starting with: {...}"
2021-11-08 11:26:53.565 CET "Running flyway JVM initializer..."
2021-11-08 11:26:54.111 CET "Running flyway migrations..."
2021-11-08 11:26:54.322 CET "[Simba][JDSI](20260) Cannot access file to use for logging: CANNOT_CREATE_LOGGING_PATH. Switching to default logging output."
2021-11-08 11:26:54.325 CET "Nov 08 10:26:54.323  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: SDK Version: 10.1.20.1161"
2021-11-08 11:26:54.330 CET "Nov 08 10:26:54.330  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Driver Version: 01.02.19.1023"
2021-11-08 11:26:54.332 CET "Nov 08 10:26:54.332  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Name: Java HotSpot(TM) 64-Bit Server VM"
2021-11-08 11:26:54.332 CET "Nov 08 10:26:54.332  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Specification Version: 1.8"
2021-11-08 11:26:54.333 CET "Nov 08 10:26:54.333  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Implementation Version: 25.151-b12"
2021-11-08 11:26:54.335 CET "Nov 08 10:26:54.335  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Vendor: Oracle Corporation"
2021-11-08 11:26:54.335 CET "Nov 08 10:26:54.335  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Operating System Name: Linux"
2021-11-08 11:26:54.339 CET "Nov 08 10:26:54.339  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Operating System Version: 5.4.120+"
2021-11-08 11:26:54.340 CET "Nov 08 10:26:54.340  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Operating System Architecture: amd64"
2021-11-08 11:26:54.340 CET "Nov 08 10:26:54.340  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Locale Name: en_US"
2021-11-08 11:26:54.340 CET "Nov 08 10:26:54.340  1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Default Charset Encoding: US-ASCII"
2021-11-08 11:26:54.358 CET "[Simba][JDSI](20260) Cannot access file to use for logging: CANNOT_CREATE_LOGGING_PATH. Switching to default logging output."
2021-11-08 11:26:54.474 CET "SQLWarning: reason([Simba][BigQueryJDBCDriver](1000019) Invalid connection property value for MetaDataFetchThreadCount, value overridden to 32.) SQLState(01S02)"
2021-11-08 11:26:57.024 CET "Nov 08 10:26:57.023 WARN 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.doConnect: [Simba][BigQueryJDBCDriver](1000019) Invalid connection property value for MetaDataFetchThreadCount, value overridden to 32."
2021-11-08 11:26:57.032 CET "SQLWarning: reason([Simba][BigQueryJDBCDriver](1000019) Invalid connection property value for MetaDataFetchThreadCount, value overridden to 32.) SQLState(01S02)"
2021-11-08 11:26:57.037 CET "Nov 08 10:26:57.037  1 com.simba.googlebigquery.jdbc.common.SConnection.SConnection: Driver version is: 01.02.19.1023"
2021-11-08 11:26:57.038 CET "Nov 08 10:26:57.038  1 com.simba.googlebigquery.jdbc.common.SConnection.SConnection: Datasource version is: 01.02.19.1023"
2021-11-08 11:26:57.144 CET "Flyway Community Edition 8.0.3 by Redgate"
2021-11-08 11:26:57.146 CET "Database: jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=amfori-data-acc;OAuthType=3; (Google BigQuery 2.0)"
2021-11-08 11:26:57.159 CET "Google BigQuery 2.0 does not support setting the schema for the current session. Default schema will NOT be changed to FLYWAY !"
2021-11-08 11:26:57.160 CET "Join the GCP BigQuery beta via https://rd.gt/3fut40f"
2021-11-08 11:26:57.160 CET "Experiencing performance issues while using GCP BigQuery?"
2021-11-08 11:26:57.163 CET "Find out how Flyway Teams improves performance with batching at https://rd.gt/3CWAuTb"
2021-11-08 11:27:55.640 CET "Google BigQuery databases have a 10 GB database size limit in Flyway Community Edition. You have used 0 GB / 10 GB Consider upgrading to Flyway Teams Edition for unlimited usage: https://rd.gt/3CWAuTb"
2021-11-08 11:27:56.285 CET "Nov 08 10:27:56.285  1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API"
2021-11-08 11:27:57.240 CET "Nov 08 10:27:57.240  1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API"
2021-11-08 11:27:58.463 CET "Nov 08 10:27:58.463  1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API"
2021-11-08 11:27:58.612 CET "Successfully validated 68 migrations (execution time 00:02.154s)"
2021-11-08 11:28:08.063 CET "Creating Schema History table `FLYWAY`.`flyway_schema_history` ..."
2021-11-08 11:28:20.046 CET "Current version of schema `FLYWAY`: << Empty Schema >>"
2021-11-08 11:28:20.049 CET "Migrating schema `FLYWAY` to version "1.0001 - Test migration" [non-transactional]"
2021-11-08 11:28:21.100 CET "Nov 08 10:28:21.100  1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API"
...
2021-11-08 11:28:25.724 CET "Nov 08 10:28:25.724  1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API"
"Nov 08 10:28:45.547  1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API"
...
"Successfully applied 68 migrations to schema `FLYWAY`, now at version v8.0022 (execution time 27:00.604s)"

Proof-of-concept 3

Wanneer ik begon met het schrijven van deze blogpost, heb ik ook nog eens de Flyway Github repo gecheckt. Ik merkte toen een interessante nieuwe module op in hun Maven multi-module project: flyway-gcp-bigquery (en ook nog eentje voor GCP Spanner). Volgens Maven Central lijkt het erop dat ze sinds juli 2021 beta-versies van BigQuery ondersteuning leveren.

Ik besloot dus om dit na te gaan en te bekijken of ik de forked PR-code kon verwijderen van mijn eigen codebase om deze dan te vervangen met de beta-versie dependency.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
...
    <properties>
	<bigquery-driver.version>1.2.19.1023</bigquery-driver.version>
	<flyway.version>8.0.3</flyway.version>
    </properties>
...
    <dependency>
	<groupId>org.flywaydb</groupId>
	<artifactId>flyway-core</artifactId>
	<version>${flyway.version}</version>
    </dependency>
    <dependency>
	<groupId>org.flywaydb</groupId>
	<artifactId>flyway-gcp-bigquery</artifactId>
	<version>${flyway.version}-beta</version>
    </dependency>
...
</project>

Na het verwijderen van de code, het toevoegen van bovenstaande dependencies (terwijl ik ook Flyway updatete van versie 7.x naar versie 8.x), het opnieuw compileren en deployen, runden alle migraties nog steeds met succes naar een lege BigQuery omgeving.

Lessons learned

De Simba BigQuery driver

De driver zelf (voor zover ik weet de enige JDBC BigQuery driver die er bestaat) doet wat ervan verwacht wordt, maar logging is een gedoe. Dit zijn de zaken die ik moest doen om de logging in Dataflow iets of wat onder controle te krijgen:

  • Een hele hoop dependencies toevoegen die proberen om zaken naar SLF4J te redirecten en deze gebruiken in mijn FlywayJvmInitializer constructor
    • org.apache.logging.log4j:log4j-iostreams
    • uk.org.lidalia:sysout-over-slf4j
  • De driver debuggen om uit te zoeken waarom er nog steeds zaken verschenen tijdens System.out
  • De com.simba.googlebigquery.dsi.core.impl.StreamHandler class overschrijven om meer logging naar SLF4J af te dwingen
public FlywayJvmInitializer() {
  PrintWriter logWriter = IoBuilder.forLogger().setLevel(Level.INFO).buildPrintWriter();
  DriverManager.setLogWriter(logWriter);
  SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
  LogFactory.setLogCreator(new Slf4jLogCreator());
}
package com.simba.googlebigquery.dsi.core.impl;

import com.simba.googlebigquery.dsi.core.interfaces.ILogHandler;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.io.IoBuilder;

import java.io.PrintWriter;

public class StreamHandler implements ILogHandler {

  private final PrintWriter m_logWriter;

  public StreamHandler() {
     this.m_logWriter = IoBuilder.forLogger("StreamHandler").setLevel(Level.INFO).buildPrintWriter();
  }

  public void writeLog(String var1) throws Exception {
     this.m_logWriter.println(var1);
     this.m_logWriter.flush();
  }
}

Geen lokale BigQuery is vervelend

Er bestaat geen manier om iets lokaal te draaien op je ontwikkelingstoestel dat kan gebruikt worden om BigQuery gedrag te valideren. Dus geen BigQuery docker image of emulator betekent dat je een apart Google project of prefixed datasets binnen een bestaand Google project nodig hebt om Flyway migraties te testen.

Door bepaalde limitaties hebben wij voor de aanpak met de prefixed datasets gekozen, maar zijn we er toch in geslaagd om deze op een transparante manier te laten werken dankzij Dataflow runtime ValueProviders, de Flyway placeholder functionaliteit en een custom utility die het aanmaken en verwijderen van datasets vergemakkelijkt.

BigQuery Time Travel is je vriend

BigQuery heeft een erg interessante feature genaamd Time Travel, die erg van pas komt wanneer Flyway migraties falen. Zeker voor de community edition van Flyway, die de 'undo' functionaliteit ontbeert, is Time Travel de gemakkelijkste manier om je database te herstellen naar hoe deze eruit zag voor de gefaalde migratie plaatsvond.

Ik vraag me af of er een manier is om de 'undo' functionaliteit te bouwen door gebruik te maken van Time Travel van BigQuery en de 'callbacks' van Flyway (degene die beschikbaar zijn in de community versie). Time Travel is bovendien handig omdat BigQuery quotas hanteert op heel wat zaken. Door bijvoorbeeld het manueel terugdraaien van veranderingen via SQL ALTER TABLE statements loop je als snel tegen deze muur aan.

Het schalen van Dataflow workers levert vreemde resultaten op

We gebruikten eerst JvmInitializer voor elke Dataflow pijplijn om het database schema up-to-date te houden, maar merkten dat er soms rijen in de Flyway geschiedenistabel gedupliceerd werden. Het blijkt dat elke Dataflow worker die door een pijplijn gestart wordt door de JVM initialisatie heen gaat. Soms gebeurt dat zo kort op elkaar dat migraties meerdere keren worden uitgevoerd. Normaal probeert Flyway om de zaken op slot te zetten om dit tegen te gaan, maar in de gekloonde code was deze werking niet beschikbaar voor BigQuery. Het lijkt erop alsof het sluiten wel beschikbaar is in de 8.x beta-versie, maar dat heb ik nog niet kunnen testen.

Om dit probleem op te lossen, hebben we ervoor gezorgd dat de JvmInitializer configureerbaar gerund kan worden en default afgezet kan worden voor alle pijplijnen. We maakten dan een specifieke dummy Flyway pijplijn aan waarbij we het aanzetten en die voor alle andere batch pijplijnen wordt uitgevoerd.

Het Flyway BigQuery migratieproces verloopt vrij traag

De initialisatie van workers neemt ongeveer 2 minuten in beslag voordat de worker daadwerkelijk acties begint te ondernemen en we Flyway in actie zien schieten. Vervolgens doet elk migratiebestand er ook minstens 30 seconden over om te runnen (soms langer, afhankelijk van de migratie en de inhoud van de tabel). Vanuit de logging lijkt het erop alsof dit deels komt door hoe de SQL draait: een BigQuery job waarbij je moet wachten op de resultaten.

Gelukkig runnen we dit slechts één maal per dag voor een dummy pijplijn en niet voor de rest van de pijplijnen dankzij het vorige probleempje hierboven. Het enige moment waarop de migratie dus traag aanvoelt is wanneer je aan het testen bent en je de hele set van migraties draait vanaf een lege omgeving.

Je zal ook je Flyway time-out moeten aanpassen naar een waarde die hoog genoeg is om ook grotere tabelveranderingen te laten lukken zonder een time-out error te veroorzaken. Momenteel werken wij met een waarde van 180 seconden.

Mixen van SQL and Java migraties is perfect mogelijk

Voor alle zaken die je wil doen met BigQuery in de context van een migratie,die niet ondersteund worden door BigQuery’s SQL-implementatie, kan je terugvallen op de Java migraties van Flyway. Met een Java migratie kan je gemakkelijk de BigQuery Java API aanspreken om alles te doen waartoe de API in staat is.

package org.flywaydb.core.internal;

import com.google.auth.Credentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.*;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;

public class V1__Java_Based_Migration extends BaseJavaMigration {

  @Override
  public void migrate(Context context) throws Exception {
     Credentials credentials = ServiceAccountCredentials.getApplicationDefault();
    
     BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder()
           .setProjectId("my-gcp-project-id")
           .setCredentials(credentials)
           .build();
    
     BigQuery service = bigQueryOptions.getService();
     TableId tableId = TableId.of("test", "test");
     Table table = service.getTable(tableId);
     TableId tableCopyId = TableId.of("test", "test_copy");
     Job job = table.copy(tableCopyId);
    
     job.waitFor();
  }

  @Override
  public Integer getChecksum() {
     return 1;
  }
}

Runtime configuratie in een JvmInitializer

Aan het einde van het experiment creëerden we een meer geavanceerde JvmInitializer die ons toelaat om Flyway migraties/herstel/baselining aan en uit te zetten, dynamische prefixing van datasets en zo voort. Hiervoor moeten we uiteraard de Dataflow pijplijnopties voorzien. Omdat we ook een pijplijn artifact bouwen (JSON + JARs) in een centrale bucket die we gebruiken om jobs in meerdere omgevingen te starten, moeten deze opties runtime-opties zijn. Hier liepen we tegen een probleem aan met het optiemechanisme van Dataflow, zeker wanneer je het vereiste of standaard mechanisme wil gebruiken. Het blijkt dat dit mechanisme niet helemaal werkt zoals je verwacht en defaults gaan verloren wanneer je geen waarde toekent aan een optie, maar probeer deze te benaderen via de JvmInitializer.

De oplossing hiervoor verschuilt zich in de Dataflow worker logs. In deze logs zagen we dat een JSON gelogd werd die de meeste informatie over opties bevat die we nodig hadden. Deze JSON is beschikbaar onder de sdk_pipeline_options_file omgevingsvariabele op een worker. Het uitlezen en parsen van deze waarde laat ons toe om een soort van werkend custom opties-object te krijgen. Door het gebruik van reflection om te kijken naar de annotaties en hun inhoud, kregen we dit werkend genoeg voor onze doeleinden.

Jan Eerdekens