rss resume / curriculum vitae linkedin linkedin gitlab github twitter mastodon instagram
Complex Pipelines in Go (Part 2): Storing Values in Batches
Aug 04, 2020

This post is part 2 in a series:

  1. Part 1 - Introduction
  2. Part 2 - Storing Values in Batches (this post)
  3. Part 3 - Transforming Data to Tab Separated Values
  4. Part 4 - Sane Coordination and Cancellation
  5. Part 5 - Putting it All Together

Storing Values in Batches

In part 1 we introduced the 3 processes meant to handle everything required for our final tool to work. In this post we will focus on the Persistent Storage Process, specifically a component used for Storing Values in Batches in PostgreSQL.

The idea behind working in batches is to speed up the loading of records while avoiding overwhelming our primary database, specially when inserting large number of values. Deciding the number of records to batch before copying them over depends on different factors that should be thought thoroughly; a few examples of those factors include: record size, memory available on instance processing events, or network capacity allocated for data transfer, to mention a few.

For our example we will use some arbitrary numbers for default values, but consider measuring your events before making this decision in production.

Minimum Requirements

All the code relevant to this post is on Github, feel free to explore it for more details, the following is the minimum required for running the example:

  • PostgreSQL 12.3: in theory any recent version should work, the README.md includes specific instructions for running it with Docker, and
  • Go 1.14

And the following Go Packages:

name is a Go struct type equivalent to the data structure we decided to use in Part 1, it is defined as:

type name struct {
	NConst             string
	PrimaryName        string
	BirthYear          string
	DeathYear          string
	PrimaryProfessions []string
	KnownForTitles     []string
}

Batching in PostgreSQL

PostgreSQL has a SQL command called COPY which is the best and most efficient way to insert a huge number of records into tables. For our Go code we will be using the pgx implementation of this command via the Conn type, specifically the CopyFrom method:

func (c *Conn) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error)

This methods allows receiving a value, in rowSrc, implementing the CopyFromSource interface type. We will implement a new type that satisfies this interface for avoiding buffering all the data in memory. Recall our end goal is to build a pipeline that accomplishes all our steps as a stream of data flowing from one step to the other. We will call this type copyFromSource.

Introducing “copyFromSource”

This type is defined as:

type copyFromSource struct {
	errorC  <-chan error
	namesC  <-chan name
	err     error
	closed  bool
	current name
}

Two really important things to understand clearly about this type is the two fields using receiving channels. Both of them are used for communicating with the upstream process, which is in charge of streaming records downstream to us (to this type copyFromSource):

  • errorC <-chan error: used to indicate when an error happened, and
  • namesC <-chan name: used to receive the events to eventually copy into the database.

With that in mind, the other important thing to understand is the implementation of Next. Specifically the select block, which we use to block until we receive a value from either channel:

func (c *copyFromSource) Next() bool {
	if c.closed {
		return false
	}

	var open bool

	select {
	case c.current, open = <-c.namesC:
	case c.err = <-c.errorC:
	}

	if !open {
		c.closed = true
		return false
	}

	if c.err != nil {
		return false
	}

	return true
}

In the end copyFromSource is implemented as a building block to accomplish two things:

  1. To satisfy the pqgx.CopyFromSource interface type, and
  2. To use it in conjunction with another type to properly coordinate this batching, that type is called copyFromSourceMediator.

Introducing “copyFromSourceMediator”

This type is defined as:

type copyFromSourceMediator struct {
	namesC chan name
	errorC chan error
	copier *copyFromSource
}

Similarly, this type implements two channels, the biggest difference is that in this case copyFromSourceMediator uses both channels to send values to copyFromSource, which in the end is the type we are mediating here, all of this is much more clearer if we look at the constructor:

func newCopyFromSourceMediator(conn *pgx.Conn) (*copyFromSourceMediator, <-chan error) {
	errorC := make(chan error)
	namesC := make(chan name)

	copier := newCopyFromSource(namesC, errorC)

	res := copyFromSourceMediator{
		namesC: namesC,
		errorC: errorC,
		copier: copier,
	}

	outErrorC := make(chan error)

	go func() {
		defer close(outErrorC)

		_, err := conn.CopyFrom(context.Background(),
			pgx.Identifier{"names"},
			[]string{
				"nconst",
				"primary_name",
				"birth_year",
				"death_year",
				"primary_professions",
				"known_for_titles",
			},
			copier)

		outErrorC <- err
	}()

	return &res, outErrorC
}

This constructor is really the one interacting with the database for copying all the received values. So how we indicate the actual batch size? That logic is going to be handled by our last type batcher.

Introducing “batcher”

This type is defined as:

type batcher struct {
	conn *pgx.Conn
	size int
}

And in the end it is the one in charge of using the other two types behind the scenes for accomplish our goal for this process. The meat of batcher is in the method Copy:

func (b *batcher) Copy(ctx context.Context, namesC <-chan name) <-chan error {
	outErrC := make(chan error)

	go func() {
		mediator, errorC := newCopyFromSourceMediator(b.conn)

		copyAll := func(m *copyFromSourceMediator, c <-chan error) error {
			m.CopyAll()
			return <-c
		}

		defer func() {
			if err := copyAll(mediator, errorC); err != nil {
				outErrC <- err
			}

			close(outErrC)
		}()

		var index int

		for {
			select {
			case name, open := <-namesC:
				if !open {
					return
				}

				mediator.Batch(name)
				index++

				if index == b.size {
					if err := copyAll(mediator, errorC); err != nil {
						outErrC <- err
					}

					mediator, errorC = newCopyFromSourceMediator(b.conn)
					index = 0
				}
			case err := <-errorC:
				outErrC <- err
			case <-ctx.Done():
				if err := ctx.Err(); err != nil {
					mediator.Err(err)
					outErrC <- err
				}
			}
		}
	}()

	return outErrC
}

Similarly to the other two types, in Copy we use a goroutine, channels and the select block to coordinate all the messages we are receiving as well as to when to indicate it’s time to batch a collection of records.

What’s next?

The next blog post will cover the implementation of the TSV parser, and as we progress in the series we will continuously connect all the pieces together to eventually complete our final tool.


Back to posts