Sunday, January 3, 2016

Data Pipeline and ETL tasks in Go using Ratchet

As Data Engineers and Software Engineers we might define Data Pipelines differently. This Wikipedia entry defines a pipeline as a set of data processing elements connected in series, where the output of one element is the input of the next one; often executed in parallel. That is pretty close to how I'll define it here. More specifically, I'll use The Golang Blog's informal definition:

A series of stages connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines receive values from upstream via inbound channels; perform some function on that data, usually producing new values; and send values downstream via outbound channels.

Ratchet is a Go library that abstracts these pipeline concepts, giving you a clear-cut framework to work with. You will satisfy a data processing interface and be provided with some convenience methods for managing the flow of data.

Perhaps you have been tasked with creating automated pipelines that fetch data from disparate sources -- such as remote databases or web services -- and then cleaning up, transforming and aggregating the unorganized data into specialized databases or data sources (an RDBMS, CSV files, email queues, etc.) so your team can use that organized data for analytics and other purposes since most programs are a form of ETL.

Ratchet is an open-source project by Daily Burn. It provides a better way of extracting data from the databases and web services an application uses, then transforming and loading that data into reporting-oriented formats. It makes doing these tasks more robust than creating basic scripts or using limited 3rd-party services that don't give you full control over your data processing.

SETUP

You can find code examples here.

Install ratchet by running:

go get github.com/dailyburn/ratchet

Example project layout (There's no required convention):

$GOPATH/src/<YOUR REPO>/<YOUR USER>/<YOUR PROJECT>
├── main.go              Data Processor initialization & Pipeline
├── packages             Your non-main packages, e.g. mypkg
│   ├── models.go        JSON structs for transforms.go
│   ├── queries.go       SQL query functions for SQLReaders
│   └── transforms.go    Custom DataProcessors

Ratchet consists of a Pipeline with a series of PipelineStages, which each handle one or more DataProcessors. DataProcessors each run in their own goroutine so all of the data is processed concurrently. The DataProcessors send and receive JSON for convenience. Ratchet gives you some useful data processors and an interface for you to implement custom ones.

Your Ratchet tasks will likely use a combination of the provided DataProcessors and your own custom ones. If you're creating an ETL task, the custom DataProcessors will usually be for the Transform stage while the Extract and Load stages could use the built-in Reader/Writer DataProcessors such as SQLReader and SQLWriter. But there are times when you will create custom Data Processors even for the extract stages, such as when you're making calls to 3rd party REST Apis, and the like.

There are other DataProcessors provided for your convenience, such as ones for Google's BigQuery and more. You will get to mix-and-match things as they make sense to your application. See the full-list of provided DataProcessors.

You will typically begin your Ratchet code by including the necessary packages. For the sake of example I will assume you are also going to access an RDBMS, such as MySQL, to perform some SQL reads and/or writes on:

package main

import (
    "github.com/dailyburn/ratchet"
    "github.com/dailyburn/ratchet/logger"
    "github.com/dailyburn/ratchet/processors"
    "github.com/<YOUR_USER>/<YOUR_PROJECT>/packages"
    "database/sql"
    _ "github.com/go-sql-driver/mysql"
)

Note that for PostgreSQL you'd import _ "github.com/lib/pq".

For the examples below we will be assuming we're extracting data from a MySQL database `srcDB`, querying its `users` table, transforming the data, and finally outputing the transformed data to a table `users2` in the destination database `dstDB`.

EXTRACT

Since an SQLReader takes a *sql.DB as the first parameter, you'll start by creating one, as we do with the setupDB function below. You'll pass that and an SQL query string to NewSQLReader to create an SQLReader for the "extract" stage:

func NewSQLReader(dbConn *sql.DB, sql string) *SQLReader

For example, in queries.go put functions that return your SQL strings:

package mypkg

func Query(minId int) string {
    return fmt.Sprintf(`SELECT id, name FROM users 
        WHERE id >= %v`, minId)
}

We simply created a function that takes arguments, generates some SQL and return a string. We can call it from main.go like:

func main() {
    inputDB := setupDB("mysql", "root:@tcp(127.0.0.1:3306)/srcDB")
    extractDP := processors.NewSQLReader(inputDB, mypkg.Query(5))
}

func setupDB(driver, conn string) *sql.DB {
    db, err := sql.Open(driver, conn)
    if err != nil {
        logger.Error(err)
        panic(err)
    }
    return db
}

Here extractDP (DP meaning DataProcessor) is of type *processors.SQLReader.

If you only need a static SQL string then NewSQLReader does the job. For a query to use the JSON results from a previous stage, use NewDynamicSQLReader.

TRANSFORM

When you need to create your own data processors, such as when creating transformers, you implicitly satisfy the DataProcessor interface by implementing its ProcessData and Finish methods. You will create a struct and attach these methods to it. It's up to you to determine how your structure will hold any state you need. Also, because ProcessData deals with receiving JSON from the previous stage and then passing JSON on to the next stage, you will need to create structs for Unmarshaling and Marshaling the JSON.

The data package includes some types and functions for using JSON with Ratchet. It provides wrapper functions for json.Unmarshal and json.Marshal called data.ParseJSON and data.NewJSON that provide additional logging. It also creates the type data.JSON as a simple name to a byte slice: type JSON []byte.

For example, put the following in transforms.go:

package mypkg

import (
    "github.com/dailyburn/ratchet/data"
    "github.com/dailyburn/ratchet/util"
)

type myTransformer struct{}

// Expose our DataProcessor for clients to use
func NewMyTransformer() *myTransformer {
    return &myTransformer{}
}

// I like to move these structs to a file called models.go
type ReceivedData struct {
    ID int `json:"id,omitempty"`
}
type TransformedData struct {
    UserID         int    `json:"user_id,omitempty"`
    SomeNewField   string `json:"some_new_field"`
}

func (t *myTransformer) ProcessData(d data.JSON, 
                                    outputChan chan data.JSON,
                                    killChan chan error) {

    // Step 1: Unmarshal json into slice of ReceivedData structs
    var users []ReceivedData
    var transforms []TransformedData 
    
    err := data.ParseJSON(d, &users)
    util.KillPipelineIfErr(err, killChan)

    // Step 2: Loop through slice and transform data
    for _, user := range users {
        transform := TransformedData{}
        transform.UserID = user.ID;
        transform.SomeNewField = "whatever"
        transforms = append(transforms, transform)
    }

    // Step 3: Marshal transformed data and send to next stage
    if len(transforms) > 0 {    
        dd, err := data.NewJSON(transforms)
        util.KillPipelineIfErr(err, killChan)
        outputChan <- dd
    }
}

func (t *myTransformer) Finish(outputChan chan data.JSON,
                               killChan chan error) {}

Notice the idiomatic NewMyTransformer function that returns a pointer to our DataProcessor's struct with its zero value. Since it starts with a capital letter, it will be used by package users. For example, in main.go:

transformDP := mypkg.NewMyTransformer()

Our transformer changed the name `id` to `user_id` for the destination table. It also omitted the `name` field and used a new field `some_new_field` that wasn't present in the source table.

LOAD

To load the data you've extracted and optionally transformed, you'll create NewSQLWriter processor:

func NewSQLWriter(db *sql.DB, tableName string) *SQLWriter

Example:

outputDB := setupDB("mysql", "root@tcp(127.0.0.1:3306)/dstDB")
outputTable := "users2"
loadDP := processors.NewSQLWriter(outputDB, outputTable)

RUNNING THE PIPELINE

Once you have your data processors setup, you will just need to pass them into a new pipeline for processing. If you just have one reader, one transformer and one loader, you can use a basic 3 stage pipeline using NewPipeline:

pipeline := ratchet.NewPipeline(extractDP, transformDP, loadDP)
pipeline.Name = "My Pipeline"
 
err := <-pipeline.Run()
if err != nil {
    logger.ErrorWithoutTrace(pipeline.Name, ":", err)
    logger.ErrorWithoutTrace(pipeline.Stats())
} else {
    logger.Info(pipeline.Name, ": Completed successfully.")
}

If you do not need a transform stage you can have just Extract and Load stages: ratchet.NewPipeline(extractDP, loadDP). This is enough if your extracted SQL fields match and table you will load the data into.

Things aren't always that simple so Ratchet provides a more flexible branching pipeline, via NewPipelineLayout, that takes a variable amount of NewPipelineStage's which in turn take a variable amount of DataProcessors. Inside the pipeline stages you will wrap your data processor instances with the Do function and pass its returned value to the Outputs method, following these rules:

  • DataProcessors in a non-final PipelineStage must use Outputs.
  • A DataProcessor must be pointed to by one of the previous Outputs (except in the first stage).
  • Outputs must point to a DataProcessor in the next immediate stage.
  • DataProcessors in the final stage must not use Outputs.

Here's how the basic 3 stage pipeline shown above would look as a branching pipeline:

layout, err := ratchet.NewPipelineLayout(
    ratchet.NewPipelineStage( // Stage 1
        ratchet.Do(extractDP).Outputs(transformDP),
    ),
    ratchet.NewPipelineStage( // Stage 2
        ratchet.Do(transformDP).Outputs(loadDP),
    ),
    ratchet.NewPipelineStage( // Stage 3
        ratchet.Do(loadDP),
    ),
)

pipeline := ratchet.NewBranchingPipeline(layout)
err = <-pipeline.Run()

Fortunately, you can do a lot more than that with a branching pipeline. Outputs can take multiple parameters to shove data into and you will often have multiple calls to Do in different stages, for handling disparate data. Sometimes you'll have 4 or 5 stages, output to multiple transformers, batchers, etc.

Example of how subscriptions & refunds layout stages might look in a real-world ETL job:

ratchet.NewPipelineStage(
    ratchet.Do(users).Outputs(subs, refunds),
),
ratchet.NewPipelineStage(
    ratchet.Do(subs).Outputs(batch),
    ratchet.Do(refunds).Outputs(batch),
),
ratchet.NewPipelineStage(
    ratchet.Do(batch).Outputs(subsTransformer, refundsTransformer),
),
ratchet.NewPipelineStage(
    ratchet.Do(subsTransformer).Outputs(writeSubs),
    ratchet.Do(refundsTransformer).Outputs(writeRefunds),
),
ratchet.NewPipelineStage(
    ratchet.Do(writeSubs),
    ratchet.Do(writeRefunds),
),

As you can see, Ratchet is very flexible. Here's another example layout - this time using a custom DataProcessor to get JSON results from a RESTful Shopify API and pass data to two different transformers. This works because your first stage doesn't have to be an SQLReader, all that matters is that it sends and receives JSON:

// some custom DataProcessor you make up to query Shopify
shopifyStage := shopify.NewShopifyOrders(shopifyAPI)
...
layout, err := ratchet.NewPipelineLayout(
    ratchet.NewPipelineStage(
        ratchet.Do(shopifyStage).Outputs(orderTransformer),
        ratchet.Do(shopifyStage).Outputs(orderItemTransformer),
    ),
    ratchet.NewPipelineStage(
        ratchet.Do(orderTransformer).Outputs(writeOrders),
        ratchet.Do(orderItemTransformer).Outputs(writeOrderItems),
    ),
    ratchet.NewPipelineStage(
        ratchet.Do(writeOrders),
        ratchet.Do(writeOrderItems),
    ),
)

Sometimes you will want to pass the original object through the various stages in case you need to use it in a later stage. This is very easy to do using NewPassthrough DataProcessor. In the following example, passing passthrough to Outputs makes it so aDP gets passed to the next stage along with the other values passed to Outputs:

passthrough := processors.NewPassthrough()

...

ratchet.NewPipelineStage(
    ratchet.Do(aDP).Outputs(anotherDP, passthrough),
),
ratchet.NewPipelineStage(
    ratchet.Do(anotherDP).Outputs(yetAnotherDP),
    ratchet.Do(passthrough).Outputs(yetAnotherDP),
...

BATCHING

Finish is called by Ratchet after a previous stage is done sending its data. You can often implement this as an empty method. Finish is more useful when you want to wait until all the data has been received before doing something with it, and if you're working with more than one input source you may really want this. If that's the case, you will typically use ProcessData for validating and storing the incoming data into the receiver struct and then doing the second and third steps above inside of Finish instead.

Here is an example of how we could rewrite the above to batch things using a non-empty struct and a pointer receiver, then complete the transformation stage in the Finish:

type myTransformer struct{
    BatchedUsers []User
}

func NewMyTransformer() *myTransformer {
    return &myTransformer{}
}

func (t *myTransformer) ProcessData(d data.JSON, 
                                    outputChan chan data.JSON,
                                    killChan chan error) {

    // Step 1: Unmarshal the JSON into a User slice
    var users []User
    
    err := data.ParseJSON(d, &users)
    util.KillPipelineIfErr(err, killChan)

    // Step 2: append via pointer receiver
    t.BatchedUsers = append(t.BatchedUsers, users...)
}

func (t *myTransformer) Finish(outputChan chan data.JSON,
                               killChan chan error) {

    var transforms []TransformedData

    // Step 3: Loop through slice and transform data
    for _, user := range t.BatchedUsers {
        transform := TransformedData{}
        transform.UserID = user.ID;
        transform.SomeNewField = "whatever"
        transforms = append(transforms, transform)
    }

    // Step 4: Marshal transformed data and send to next stage
    // Write the results if more than one row/record.
    // You can change the batch size by setting loadDP.BatchSize
    if len(transforms) > 0 {
        dd, err := data.NewJSON(transforms)
        util.KillPipelineIfErr(err, killChan)
        outputChan <- dd
    }
}

DEBUGGING

Ratchet provides a logging facility that's very useful for debugging Ratchet tasks. It is often helpful to temporarily place the following line into your ProcessData implementation so that you can see output from the calls to logger.Debug:

logger.LogLevel = logger.LevelDebug

Even better is for you to create a way to set the different levels from the CLI. For example: --log-level="debug". And so on for LevelError, LevelInfo, LevelStatus, LevelSilent. I recommend using LevelSilent on production or whenever you just need the job to run faster.

Add pipeline.PrintData = true along with LevelDebug to set the pipeline to show all data payloads received when logger is in Debug mode. This is a very convenient way to see all of the data flowing through the different stages.

Be sure to read the log output (ratchet_default.log) because it shows the SQL INSERT Data that was actually executed, for example. The logger package provides some other useful logging functions for you to use as well.

Another debugging/development tip is to temporarily write the final stage output to a CSVWriter that goes to standard output. This allows you to quickly prototype a Ratchet task without having to set up or write to your final database table(s) yet. Example:

// Setup all the DataProcessors
users := processors.NewSQLReader(inputDB, mypkg.Query())
bq := processors.NewDynamicBigQueryReader(bqConfig, mypkg.Query)
bq.UnflattenResults = true
transformer := mypkg.NewMyTransformer()
writeCSV := processors.NewCSVWriter(os.Stdout)

// Create a new Pipeline using the DataProcessors
layout, err := ratchet.NewPipelineLayout(
    ratchet.NewPipelineStage(
        ratchet.Do(users).Outputs(bq),
    ),
    ratchet.NewPipelineStage(
        ratchet.Do(bq).Outputs(transformer),
    ),
    ratchet.NewPipelineStage(
        ratchet.Do(transformer).Outputs(writeCSV),
    ),
    ratchet.NewPipelineStage(
        ratchet.Do(writeCSV),
    ),
)

TIPS

Make sure to add the String method to your transformer because the log output by default is to show all the data in the struct, which shows too much output and slows everything down:

func (t *myTransformer) String() string {
    return "myTransformer"
}

Tip: Creating structs for all the JSON you will be processing can be tedious so I highly recommend the tool json2go.

I have covered all of the main concepts. Please see the Ratchet documentation. Some things are currently undocumented, such as a 3.0 branch that adds support for Amazon Redshift so see the source code and GitHub repository for more information. If you have any questions please post them in the comment section below.

About Me

Followers