rss twitter gitlab github linkedin linkedin instagram
Learning Go: Pipeline Concurrency Pattern
Sep 10, 2021

Disclaimer: This post includes Amazon affiliate links. If you click on one of them and you make a purchase I’ll earn a commission. Please notice your final price is not affected at all by using those links.

Welcome to another post part of the series covering Concurrency Patterns in Go, this time I’m talking about the Pipeline pattern.



What is the Pipeline pattern?

According to Wikipedia, a Pipeline:

…consists of a group of processing elements arranged so the output of each element is the input of the next one.

In Go this is equivalent to implementing functions that receive a channel and return another channel for another function to consume, typically each function will mutate the results in a way those are useful for the subsequent steps.

Pipeline

For example in a program that consists of multiple steps, they could be implemented in such a way the output of the initial step is the input of the next one; however one thing to keep in mind is that this doesn’t mean those steps should be sequential but rather they only branch off depending on how the program needs to implemented.

When building pipelines consider that the slowest step in your process will determine how fast or slow everything will be; so in some cases it could make sense to also apply a different concurrency pattern to the branched off step, recall to always benchmark.

Pipeline Concurrency Pattern Example

The code used for this post is available on Github.

The following example (errors ommitted for brevity):

1func main() {
2	recordsC, _ := readCSV("file1.csv")
3
4	for val := range sanitize(titleize(recordsC)) {
5		fmt.Printf("%v\n", val)
6	}
7}

Will read a CSV file and apply some logic to each value read, it consists of:

  • L2: We read a CSV file, the result is a channel recordsC.
  • L4: We use recordsC as the input of titleize and sanitize

The function titleize:

 1func titleize(strC <-chan []string) <-chan []string {
 2	ch := make(chan []string)
 3
 4	go func() {
 5		for val := range strC {
 6			val[0] = strings.Title(val[0])
 7			val[1], val[2] = val[2], val[1]
 8
 9			ch <- val
10		}
11
12		close(ch)
13	}()
14
15	return ch
16}
  • L2: ch channel is created (it will be used as the input for the next pipeline step)
  • L4-13: Uses a goroutine to:
    • L5: Receive the values from the channel strC,
    • L6-7: Transform those values by Titleizeing them and switching them, and
    • L9: Sends the changes to the channel ch
    • L12: After the for is completed, then channel ch is closed to indicate the indicate the end of this pipeline step.
  • L15: ch channel is returned.

Implementation-wise the function sanitize is a bit similar to the previous one:

 1func sanitize(strC <-chan []string) <-chan []string {
 2	ch := make(chan []string)
 3
 4	go func() {
 5		for val := range strC {
 6			if len(val[0]) > 3 {
 7				fmt.Println("skipped ", val)
 8				continue
 9			}
10
11			ch <- val
12		}
13
14		close(ch)
15	}()
16
17	return ch
18}

The biggest difference would be the actual logic applied to the received values:

  • L6-9: Where we skip any value that as length longer than 3.

If you recall main we have the pipeline call using both those functions:

sanitize(titleize(recordsC))

Now if this is changed to be something like:

titleize(sanitize(recordsC))

As expected, the output will change, because now the sanitize happens before titleize; but the functions still behave as steps in our pipeline.

Conclusion

Pipeline is a well-known concurrency pattern that is easy to implement in Go and allow us to use the primitives of the language to describe the steps a in concurrent way, however, before implementing it in your programs make sure each step is benchmarked so you have a baseline for comparing your changes, typically using goroutines improves the performance of our programs, but in some cases it could make things worse.

If you’re looking to sink your teeth into more Go-related topics I recommend the following:


Back to posts