Disclaimer: This post includes Amazon affiliate links. If you click on one of them and you make a purchase I’ll earn a commission. Please notice your final price is not affected at all by using those links.
Welcome to another post part of the series covering Concurrency Patterns in Go, this time I’m talking about a package that allows us to implement concurrency patterns using context.Context
as the way to coordinate goroutines called: errgroup
.
The code used for this post is available on Github.
What is in errgroup
?
The errorgroup
package is not part of the standard library, it’s a third party package that has to be downloaded (using the typical go mod
calls) to be used in any of our Go projects, it’s in way supported by the Go team because the full package name is golang.org/x/sync/errgroup
, but still it does not follow the version one compatibility.
This package includes Synchronization, Error Propagation and Context Cancellation; and it’s meant to be used for groups of goroutines working on a common task.
Comparing errgroup
to sync.WaitGroup
In a way, errgroup
is a bit similar to using sync.WaitGroup
because it somehow allows us to wait until all goroutines are completed, however the biggest difference is when using it together with the context.Context
type.
Let’s take the following example:
1func main() {
2 wait := waitGroups()
3 <-wait
4}
5
6func waitGroups() <-chan struct{} {
7 ch := make(chan struct{}, 1)
8
9 var wg sync.WaitGroup
10
11 for _, file := range []string{"file1.csv", "file2.csv", "file3.csv"} {
12 file := file // XXX: Important gotcha when using variables in goroutines in the same block
13
14 wg.Add(1)
15
16 go func() {
17 defer wg.Done()
18
19 ch, err := read(file)
20 if err != nil {
21 fmt.Printf("error reading %v\n", err)
22 }
23
24 for line := range ch {
25 fmt.Println(line)
26 }
27 }()
28 }
29
30 go func() {
31 wg.Wait()
32 close(ch)
33 }()
34
35 return ch
36}
The code above uses a function called read
to read CSV files indicated in the input (feel free to read the implementation if you’re curios) what matters the most in this example is the returned value from that read
function and how those values are used in the following steps:
- L14: We increase the
WaitGroup
by1
each time we read a filename, - L16-27: For each filename we are supposed to read we launch a new goroutine,
- L19: This new goroutine will read the values from the returned channel in
read
, - L17: Once the channel closes, we call
Done
in the WaitGroup to indicate that goroutine is finished
- L19: This new goroutine will read the values from the returned channel in
- L30-33: A goroutine is launched to Wait for all other goroutines, this is calling the
Wait
method in the WaitGroup- L32: Once we are told all goroutines are finished we closed the returned channel
ch
, this is to indicate the caller that everything has completed
- L32: Once we are told all goroutines are finished we closed the returned channel
- L35: We return a channel
ch
used as a way to indicate the original caller (main
in this case) to wait until everything completes.
That code above works for sure, let’s see how this can be implemented using errgroup
:
1func main() {
2 wait := errGroup()
3 <-wait
4}
5
6func errGroup() <-chan struct{} {
7 ch := make(chan struct{}, 1)
8
9 var g errgroup.Group
10
11 for _, file := range []string{"file1.csv", "file2.csv", "file3.csv"} {
12 file := file
13
14 g.Go(func() error {
15 ch, err := read(file)
16 if err != nil {
17 return fmt.Errorf("error reading %w", err)
18 }
19
20 for line := range ch {
21 fmt.Println(line)
22 }
23
24 return nil
25 })
26 }
27
28 go func() {
29 if err := g.Wait(); err != nil {
30 fmt.Printf("Error reading files %v", err)
31 }
32
33 close(ch)
34 }()
35
36 return ch
37}
Compared to the WaitGroup
example the biggest difference would be the fact that:
- L9: We use
errgroup.Group
, - L14: goroutines are launched using the
Go
method from theerrgroup.Group
type and - L29:
errgroup.Wait
is the one used for waiting for the launched goroutines to completed.
Let’s look at another example that truly displays the power of using errgroup
.
Using context.Context
with errgroup
The errgroup
package shines when context.Context
is included, remember that context.Context
is used for things like Cancellation and Error Propagation, combining this with errgroup
you should be able to build complex programs that can react to errors triggered as part of the context.Context
type, for example:
1func main() {
2 ctx := context.Background()
3 wait := errGroup(ctx)
4 <-wait
5}
6
7func errGroup(ctx context.Context) <-chan struct{} {
8 ch := make(chan struct{}, 1)
9
10 g, ctx := errgroup.WithContext(ctx)
11
12 for _, file := range []string{"file1.csv", "file2.csv", "file3.csv"} {
13 file := file
14
15 g.Go(func() error {
16 ch, err := read(file)
17 if err != nil {
18 return fmt.Errorf("error reading %w", err)
19 }
20
21 for {
22 select {
23 case <-ctx.Done():
24 fmt.Printf("Context completed %v\n", ctx.Err())
25
26 return ctx.Err()
27 case line, ok := <-ch:
28 if !ok {
29 return nil
30 }
31
32 fmt.Println(line)
33 }
34 }
35 })
36 }
37
38 go func() {
39 if err := g.Wait(); err != nil {
40 fmt.Printf("Error reading files: %v", err)
41 }
42
43 close(ch)
44 }()
45
46 return ch
47}
This example is a modified version of what we had before when errgroup.Group
was introduced, the key parts are:
- L21-34: We use a
for
toselect
the whether the channels use use are still valid:- L23-26: Determines if the context variable (
ctx
) already finished, and - L27-32: Processes the received messaged from the channel, reacting to when it gets closed.
- L23-26: Determines if the context variable (
One real life example I implemented previously using this pattern was when I covered Implementing Complex Pipelines, it shows how all of this can come together when building complex programs using goroutines, feel free to read that series as well it should give you more details about a concrete and full example.
Conclusion
I’m a huge fan of errgroup
because it simplifies the process of building programs that require synchronized work and it allows us to deal with errors that propagate between multiple goroutines.
Recommended reading
If you’re looking to sink your teeth into more Go-related topics I recommend the following:
- Get Programming with Go
- Learning Go: Context package
- Learning Go: Introduction to Concurrency Patterns
- Learning Go: Interface Types - Part 1
- Learning Go: Interface Types - Part 2
- Learning Go: Fan-In and Fan-Out Concurrency Pattern