This is the last post of the series Complex Pipelines in Go!
It’s time to connect all the dots and build the final tool. The most important components were already covered in the previous posts but are missing something else: the initial HTTP request meant to be used for downloading the gzip file.
Let’s work on that first and then we can put everything together.
All the code relevant to this post is on Github, feel free to explore it for more details, the following is the minimum required for running the example:
To download the file via HTTP we have to use the standard library, specifically the types net/http.Client
and compress/gzip.Reader
, this is because the file we expect to download is a gzipped one.
For both requirements the following short snippet should cover that:
// XXX omiting error handling to keep code short
req, _ := http.NewRequest(http.MethodGet, "https://datasets.imdbws.com/name.basics.tsv.gz", nil)
client := &http.Client{
Timeout: 10 * time.Minute, // XXX: use something reasonable
resp, _ := client.Do(req)
defer resp.Body.Close()
gr, _ := gzip.NewReader(resp.Body)
defer gr.Close()
for {
line, err := cr.ReadString('\n')
if err == io.EOF {
// XXX: do something with the read value!
The biggest and most important thing to consider during this integration (of all the previous posts, that is) is how we should handle downstream errors coming from PostgreSQL, specifically the change in batcher
, which in the end consolidates a type introduced in part 2 (Storing Values in Batches) called copyFromSourceMediator
this is with the idea of handling errors more closely.
The reason being of this change is the delay between the actual pgx
calls (and therefore PostgreSQL) and ours, in practice what this means is that we require sync.Mutex
for synchronizing the two goroutines handling sending messages to PostgreSQL and receiving messages from upstream.
func (b *batcher) Copy(ctx context.Context, namesC <-chan name) <-chan error {
outErrC := make(chan error)
var mutex sync.Mutex
var copyFromErr error
copyFrom := func(batchNamesC <-chan name, batchErrC <-chan error) <-chan error {
cpOutErrorC := make(chan error)
go func() {
defer close(cpOutErrorC)
copier := newCopyFromSource(batchNamesC, batchErrC)
_, err := b.conn.CopyFrom(ctx,
if err != nil {
copyFromErr = err
return cpOutErrorC
go func() {
batchErrC := make(chan error)
batchNameC := make(chan name)
cpOutErrorC := copyFrom(batchNameC, batchErrC)
defer func() {
var index int64
for {
select {
case n, open := <-namesC:
if !open {
if copyFromErr != nil {
namesC = nil
outErrC <- copyFromErr
batchNameC <- n
if index == b.size {
if err := <-cpOutErrorC; err != nil {
outErrC <- err
batchErrC = make(chan error)
batchNameC = make(chan name)
cpOutErrorC = copyFrom(batchNameC, batchErrC)
index = 0
case <-ctx.Done():
if err := ctx.Err(); err != nil {
batchErrC <- err
outErrC <- err
return outErrC
The code looks like a lot but really the important bits are in the variable/function copyFrom
which still uses copyFromSource
for dealing with events coming from upstream and also uses a mutex to set errors coming from pgx’s CopyFrom
This is the last post of the series but I don’t think this is the end, I will follow up and improve the existing code in the future. I will answer (at least) the two following questions:
I will improve this implementation, wait for it.