This post is part 2 in a series:
- Part 1 - Introduction
- Part 2 - Storing Values in Batches (this post)
- Part 3 - Transforming Data to Tab Separated Values
- Part 4 - Sane Coordination and Cancellation
- Part 5 - Putting it All Together
Storing Values in Batches
In part 1 we introduced the 3 processes meant to handle everything required for our final tool to work. In this post we will focus on the Persistent Storage Process, specifically a component used for Storing Values in Batches in PostgreSQL.
The idea behind working in batches is to speed up the loading of records while avoiding overwhelming our primary database, specially when inserting large number of values. Deciding the number of records to batch before copying them over depends on different factors that should be thought thoroughly; a few examples of those factors include: record size, memory available on instance processing events, or network capacity allocated for data transfer, to mention a few.
For our example we will use some arbitrary numbers for default values, but consider measuring your events before making this decision in production.
Minimum Requirements
All the code relevant to this post is on Github, feel free to explore it for more details, the following is the minimum required for running the example:
- PostgreSQL 12.3: in theory any recent version should work, the README.md includes specific instructions for running it with Docker, and
- Go 1.14
And the following Go Packages:
github.com/jackc/pgx
v4: used for interacting with PostgreSQL, andgithub.com/bxcodec/faker
v3 for generating instances ofname
to simulate records being received and meant to be batched.
name
is a Go struct type equivalent to the data structure we decided to use in Part 1, it is defined as:
type name struct {
NConst string
PrimaryName string
BirthYear string
DeathYear string
PrimaryProfessions []string
KnownForTitles []string
}
Batching in PostgreSQL
PostgreSQL has a SQL command called COPY
which is the best and most efficient way to insert a huge number of records into tables. For our Go code we will be using the pgx
implementation of this command via the Conn
type, specifically the CopyFrom
method:
func (c *Conn) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error)
This methods allows receiving a value, in rowSrc
, implementing the CopyFromSource
interface type. We will implement a new type that satisfies this interface for avoiding buffering all the data in memory. Recall our end goal is to build a pipeline that accomplishes all our steps as a stream of data flowing from one step to the other. We will call this type copyFromSource
.
Introducing “copyFromSource”
This type is defined as:
type copyFromSource struct {
errorC <-chan error
namesC <-chan name
err error
closed bool
current name
}
Two really important things to understand clearly about this type is the two fields using receiving channels. Both of them are used for communicating with the upstream process, which is in charge of streaming records downstream to us (to this type copyFromSource
):
errorC <-chan error
: used to indicate when an error happened, andnamesC <-chan name
: used to receive the events to eventually copy into the database.
With that in mind, the other important thing to understand is the implementation of Next
. Specifically the select
block, which we use to block until we receive a value from either channel:
func (c *copyFromSource) Next() bool {
if c.closed {
return false
}
var open bool
select {
case c.current, open = <-c.namesC:
case c.err = <-c.errorC:
}
if !open {
c.closed = true
return false
}
if c.err != nil {
return false
}
return true
}
In the end copyFromSource
is implemented as a building block to accomplish two things:
- To satisfy the
pqgx.CopyFromSource
interface type, and - To use it in conjunction with another type to properly coordinate this batching, that type is called
copyFromSourceMediator
.
Introducing “copyFromSourceMediator”
This type is defined as:
type copyFromSourceMediator struct {
namesC chan name
errorC chan error
copier *copyFromSource
}
Similarly, this type implements two channels, the biggest difference is that in this case copyFromSourceMediator
uses both channels to send values to copyFromSource
, which in the end is the type we are mediating here, all of this is much more clearer if we look at the constructor:
func newCopyFromSourceMediator(conn *pgx.Conn) (*copyFromSourceMediator, <-chan error) {
errorC := make(chan error)
namesC := make(chan name)
copier := newCopyFromSource(namesC, errorC)
res := copyFromSourceMediator{
namesC: namesC,
errorC: errorC,
copier: copier,
}
outErrorC := make(chan error)
go func() {
defer close(outErrorC)
_, err := conn.CopyFrom(context.Background(),
pgx.Identifier{"names"},
[]string{
"nconst",
"primary_name",
"birth_year",
"death_year",
"primary_professions",
"known_for_titles",
},
copier)
outErrorC <- err
}()
return &res, outErrorC
}
This constructor is really the one interacting with the database for copying all the received values. So how we indicate the actual batch size? That logic is going to be handled by our last type batcher
.
Introducing “batcher”
This type is defined as:
type batcher struct {
conn *pgx.Conn
size int
}
And in the end it is the one in charge of using the other two types behind the scenes for accomplish our goal for this process. The meat of batcher
is in the method Copy
:
func (b *batcher) Copy(ctx context.Context, namesC <-chan name) <-chan error {
outErrC := make(chan error)
go func() {
mediator, errorC := newCopyFromSourceMediator(b.conn)
copyAll := func(m *copyFromSourceMediator, c <-chan error) error {
m.CopyAll()
return <-c
}
defer func() {
if err := copyAll(mediator, errorC); err != nil {
outErrC <- err
}
close(outErrC)
}()
var index int
for {
select {
case name, open := <-namesC:
if !open {
return
}
mediator.Batch(name)
index++
if index == b.size {
if err := copyAll(mediator, errorC); err != nil {
outErrC <- err
}
mediator, errorC = newCopyFromSourceMediator(b.conn)
index = 0
}
case err := <-errorC:
outErrC <- err
case <-ctx.Done():
if err := ctx.Err(); err != nil {
mediator.Err(err)
outErrC <- err
}
}
}
}()
return outErrC
}
Similarly to the other two types, in Copy
we use a goroutine, channels and the select
block to coordinate all the messages we are receiving as well as to when to indicate it’s time to batch a collection of records.
What’s next?
The next blog post will cover the implementation of the TSV parser, and as we progress in the series we will continuously connect all the pieces together to eventually complete our final tool.