How to set up simple and flexible ETL based anonymization – pt 1
SOFTWARE DEVELOPMENTDOCKERETLPIPELINEWISE
02/06/2021 • Jan Eerdekens

How to set up simple and flexible ETL based anonymization - part 3

In previous two instalments of this series, we covered the research part and the creation of an ETL pipeline using Singer. In this third and last part, we focus on how we can change the pipeline to something more cloud native.

Pipelinewise

We already encountered the name Pipelinewise during our research when looking for Singer taps and targets. During that search, we found the Pipelinewise Transform Field step and the Pipelinewise Target S3 CSV target, used in our Singer based pipeline solution.

The Pipelinewise website reads the following:
PipelineWise is a Data Pipeline Framework using the Singer.io specification to ingest and replicate data from various sources to various destinations.

Built with ELT in mind: PipelineWise fits into the ELT landscape and is not a traditional ETL tool. PipelineWise aims to reproduce the data from the source to an Analytics-Data-Store in as close to the original format as possible. Some minor load time transformations are supported but complex mapping and joins have to be done in the Analytics-Data-Store to extract meaning.

PipelineWise is a Data Pipeline Framework using the Singer.io specification to ingest and replicate data from various sources to various destinations.

Built with ELT in mind: PipelineWise fits into the ELT landscape and is not a traditional ETL tool. PipelineWise aims to reproduce the data from the source to an Analytics-Data-Store in as close to the original format as possible. Some minor load time transformations are supported but complex mapping and joins have to be done in the Analytics-Data-Store to extract meaning.

Pipelinewise basically extends Singer from an EL product into an ELT product. The transformation options are limited, but more than enough to fit our use case. What also immediately catches the eye is the Running in Docker section in their documentation.

Docker: so hot right now

If we can replicate our Singer based pipeline in Pipelinewise, this additional Docker functionality allows us to easily run our complete solution in a cloud native way. That means we won’t need to bother with building one or more custom Docker containers around our existing tap/transform/target solution.

Replicating our Singer pipeline in Pipelinewise

As is probably the case for a lot of developers these days, Docker was already installed on my system. In case it isn’t for you, just follow the installation instructions on their site to install it. Once you have Docker, starting to work with Pipelinewise is as easy as cloning their Github repository and building a Docker image from it. This image contains the necessary taps & targets.

To replicate the Singer pipeline from the previous blog post, we will need tap-oracle, target-s3-csv and transform-field. The first one is actually a Pipelinewise version of the Singer tap-oracle. Curiously, it has better documentation about the configuration than the original. Specifying this tap as a Pipelinewise connector during the docker build will also automatically install the required Oracle Instant Client.

git clone https://github.com/transferwise/pipelinewise.git
cd ./pipelinewise
docker build --build-arg connectors=tap-oracle,target-s3-csv,transform-field -t pipelinewise:latest .
alias pipelinewise="$(pwd)/bin/pipelinewise-docker"

This takes some time to finish (about 6 minutes on my machine), after which we can create the actual pipeline. You can also name your image differently or use a different version than latest. In that case, change the values in the bin/pipelinewise-docker file. This causes the alias to use your values instead.

pipelinewise init --name anonimization-pipeline

The command above creates a new directory called anonimization-pipeline in the pipelinewise directory that was created by checking out the repository. There are a bunch of sample files in this directory. To replicate our Singer pipeline, we can just delete all of them except for tap_oracle.yml.sample and target_s3_csv.yml.sample. For these 2 files, we’ll need to remove the .sample extension.

Now, edit the tap_oracle.yml file so it has the contents below:

---
 
id: "oracle"
name: "Oracle Import connector"
type: "tap-oracle"
db_conn:
  host: "localhost"
  port: 1521
  sid: "OraDoc"
  user: "etl"
  password: "admin123"
  filter_schemas: "ETL"
target: "s3"
schemas:
  - source_schema: "ETL"
    target_schema: "ETL"
    tables:
      - table_name: "TEST"
        replication_method: "FULL_TABLE"
        transformations:
         - column: "FIRST_NAME"
           type: "HASH"

A Pipelinewise tap serves 2 purposes at once. It’s the extraction part, but also, by means of the transformations section for a table, the transformation part of the pipeline. Next, edit the target_s3_csv.yml file and give it the contents below:

---
 
id: "s3" 
name: "S3 Target connector"
type: "target-s3-csv"
db_conn:
  aws_access_key_id: "your_own_aws_access_key_id_value"
  aws_secret_access_key: "your_own_aws_secret_access_key_value"
  s3_bucket: "anonymized-data-bucket"

Lastly, import the pipeline:

developer@laptop pipelinewise % pipelinewise import --dir anonimization-pipeline
time=2021-05-18 11:55:02 logger_name=pipelinewise log_level=INFO message=Profiling mode not enabled
time=2021-05-18 11:55:03 logger_name=pipelinewise.cli.config log_level=INFO message=Searching YAML config files in /app/wrk
time=2021-05-18 11:55:03 logger_name=pipelinewise.cli.config log_level=INFO message=LOADING TARGET: target_s3_csv.yml
time=2021-05-18 11:55:03 logger_name=pipelinewise.cli.config log_level=INFO message=LOADING TAP: tap_oracle.yml
time=2021-05-18 11:55:03 logger_name=pipelinewise.cli.config log_level=INFO message=SAVING CONFIG
time=2021-05-18 11:55:03 logger_name=pipelinewise.cli.config log_level=INFO message=SAVING MAIN CONFIG JSON to /root/.pipelinewise/config.json
time=2021-05-18 11:55:03 logger_name=pipelinewise.cli.config log_level=INFO message=SAVING TARGET JSONS to /root/.pipelinewise/s3/config.json
time=2021-05-18 11:55:03 logger_name=pipelinewise.cli.config log_level=INFO message=SAVING TAP JSONS to /root/.pipelinewise/s3/oracle
time=2021-05-18 11:55:03 logger_name=pipelinewise.cli.pipelinewise log_level=INFO message=ACTIVATING TAP STREAM SELECTIONS...
[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 6 concurrent workers.
time=2021-05-18 11:55:03 logger_name=pipelinewise.cli.pipelinewise log_level=INFO message=Discovering oracle (tap-oracle) tap in s3 (target-s3-csv) target...
time=2021-05-18 11:55:04 logger_name=pipelinewise.cli.pipelinewise log_level=INFO message=Writing new properties file with changes into /root/.pipelinewise/s3/oracle/properties.json
[Parallel(n_jobs=-1)]: Done   1 tasks      | elapsed:    0.9s
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    0.9s finished
time=2021-05-18 11:55:04 logger_name=pipelinewise.cli.pipelinewise log_level=INFO message=
            -------------------------------------------------------
            IMPORTING YAML CONFIGS FINISHED
            -------------------------------------------------------
                Total targets to import        : 1
                Total taps to import           : 1
                Taps imported successfully     : 1
                Taps failed to import          : []
                Runtime                        : 0:00:01.007921
            -------------------------------------------------------

This imports the pipeline. It also does the tap catalog discovery that we manually did in the previous instalment of this blog post. When we run the pipelinewise status command, our pipeline should be correctly imported, enabled and ready to run.

developer@laptop pipelinewise % pipelinewise status
time=2021-05-18 11:57:40 logger_name=pipelinewise log_level=INFO message=Profiling mode not enabled
Tap ID    Tap Type    Target ID    Target Type    Enabled    Status    Last Sync            Last Sync Result
--------  ----------  -----------  -------------  ---------  --------  -------------------  ------------------
oracle    tap-oracle  s3           target-s3-csv  True       ready     2021-03-11T14:18:23  success
1 pipeline(s)

Running the pipeline

This leaves us with just running the actual pipeline. Do this with the pipelinewise run_tap command:

developer@laptop pipelinewise % pipelinewise run_tap --tap oracle --target s3
time=2021-05-18 12:04:24 logger_name=pipelinewise log_level=INFO message=Profiling mode not enabled
time=2021-05-18 12:04:24 logger_name=pipelinewise.cli.pipelinewise log_level=INFO message=Running oracle tap in s3 target
time=2021-05-18 12:04:24 logger_name=pipelinewise.cli.pipelinewise log_level=INFO message=No table available that needs to be sync by fastsync
time=2021-05-18 12:04:24 logger_name=pipelinewise.cli.pipelinewise log_level=INFO message=Table(s) selected to sync by singer: ['ETL-TEST']
time=2021-05-18 12:04:24 logger_name=pipelinewise.cli.commands log_level=INFO message=Writing output into /root/.pipelinewise/s3/oracle/log/s3-oracle-20210518_120424.singer.log
time=2021-05-18 12:04:25 logger_name=pipelinewise.cli.pipelinewise log_level=INFO message=
-------------------------------------------------------
TAP RUN SUMMARY
-------------------------------------------------------
    Status  : SUCCESS
    Runtime : 0:00:01.545607
-------------------------------------------------------

The output shows that our pipeline has run successfully. A new CSV file has been added to our bucket:

developer@laptop pipelinewise % aws s3 ls s3://anonymized-data-bucket/ --profile your_aws_profile_name
2021-05-18 14:04:26        198 ETL-TEST-20210518T120425.csv
2021-04-13 22:38:50        186 TEST-20210413T223846.csv

Moreover, the file contains the correct information:

developer@laptop pipelinewise % aws s3 cp s3://anonymized-data-bucket/ETL-TEST-20210518T120425.csv --profile your_aws_profile_name - | cat
FIRST_NAME,LAST_NAME,PERSON_ID,PHONE
a8cfcd74832004951b4408cdb0a5dbcd8c7e52d43f7fe244bf720582e05241da,Doe,1,0499010203
5c5db120cb11bee138ff3143edcbedaead684de7a0ba140e12287d436c5dc487,Doe,2,0499040506

External configuration

Now we have something that’s able to replicate our pipeline from the previous blog post, but this time using a Docker based approach. It’s now able to run in all contexts that accept Docker images. This ranges from a simple physical server or a compute instance where you install and run Docker or Docker Compose yourself, to a container as a service setup like Amazon ECS and up to a full blown Kubernetes cluster.

The only thing that’s still missing to complete our cloud native scenario is to have external configuration for our pipeline. We need this to pass on all values that differ between environments instead of having to create specific configuration files for each environment.

Luckily, Pipelinewise also supports this. We just need to use the env_var notation for every value that we want to inject. Here’s an example of this for the S3 secrets:

---
 
id: "s3" 
name: "S3 Target connector"
type: "target-s3-csv"
db_conn:
  aws_access_key_id: "{{ env_var['AWS_ACCESS_KEY_ID'] }}"
  aws_secret_access_key: "{{ env_var['AWS_SECRET_ACCESS_KEY'] }}"
  s3_bucket: "anonymized-data-bucket"

docker compose meme

When using Docker Compose, this works correctly out of the box. You can specify environment variables in your docker-compose.yml or in a .env file as you can see in Sample Project for Docker Development Environment. In order to get this to work locally, however, I needed to change my bin/pipelinewise-docker file slightly. This change is needed to pass environment variables to the normal command in a way that the docker run inside it also uses them:

...
 
while [[ $# -gt 0 ]]; do
  case $1 in
    --dir)
    HOST_DIR=$(cd "$(dirname "$2")"; pwd)/$(basename "$2")
    ARGS="$ARGS --dir $CONT_WORK_DIR"
    shift
    shift
    ;;
    -e)
    ENV+="$1 $2 "
    shift
    shift
    ;;
    *)
    ARGS="$ARGS $1"
    shift
    ;;
  esac
done
 
...
 
docker run \
  --rm \
  $ENV \
  -v ${HOST_CONFIG_DIR}:${CONT_CONFIG_DIR} \
  -v ${HOST_DIR}:${CONT_WORK_DIR} \
  ${IMAGE}:${VERSION} \
  ${ARGS}

With the above changes, the pipelinewise import then becomes:

developer@laptop pipelinewise % pipelinewise import --dir anonimization-pipeline  -e "AWS_ACCESS_KEY_ID=your_own_aws_access_key_id_value" -e "your_own_aws_secret_access_key_value=OraDoc"

Conclusion

This post has shown that the conversion from Singer to Pipelinewise is pretty easy. It gives us a Docker based system that can easily be used locally or in the cloud with externalised configuration. Scheduling isn’t part of Pipelinewise itself but is easy to achieve. Depending on your choice of deployment, you can do it with a simple cron or something like Apache Airflow.

For me, this was an interesting POC to do. I was pleasantly surprised to find a bunch of easy tools that are simple to use and can be stitched together. There were some small issues along the way, but nothing insurmountable.

The only remark that I have is that while Singer and Pipelinewise work perfectly and will continue to work, they seem to be used mostly these days in other (SaaS) products. That means the free and open parts might not be under very active development anymore, so that’s something to keep in mind.

Jan Eerdekens