rss twitter gitlab github linkedin linkedin instagram
Learning Go: Concurrency Patterns using errgroup package
Sep 03, 2021

Disclaimer: This post includes Amazon affiliate links. If you click on one of them and you make a purchase I’ll earn a commission. Please notice your final price is not affected at all by using those links.

Welcome to another post part of the series covering Concurrency Patterns in Go, this time I’m talking about a package that allows us to implement concurrency patterns using context.Context as the way to coordinate goroutines called: errgroup.



The code used for this post is available on Github.

What is in errgroup?

The errorgroup package is not part of the standard library, it’s a third party package that has to be downloaded (using the typical go mod calls) to be used in any of our Go projects, it’s in way supported by the Go team because the full package name is golang.org/x/sync/errgroup, but still it does not follow the version one compatibility.

This package includes Synchronization, Error Propagation and Context Cancellation; and it’s meant to be used for groups of goroutines working on a common task.

Comparing errgroup to sync.WaitGroup

In a way, errgroup is a bit similar to using sync.WaitGroup because it somehow allows us to wait until all goroutines are completed, however the biggest difference is when using it together with the context.Context type.

Let’s take the following example:

 1func main() {
 2	wait := waitGroups()
 3	<-wait
 4}
 5
 6func waitGroups() <-chan struct{} {
 7	ch := make(chan struct{}, 1)
 8
 9	var wg sync.WaitGroup
10
11	for _, file := range []string{"file1.csv", "file2.csv", "file3.csv"} {
12		file := file // XXX: Important gotcha when using variables in goroutines in the same block
13
14		wg.Add(1)
15
16		go func() {
17			defer wg.Done()
18
19			ch, err := read(file)
20			if err != nil {
21				fmt.Printf("error reading %v\n", err)
22			}
23
24			for line := range ch {
25				fmt.Println(line)
26			}
27		}()
28	}
29
30	go func() {
31		wg.Wait()
32		close(ch)
33	}()
34
35	return ch
36}

The code above uses a function called read to read CSV files indicated in the input (feel free to read the implementation if you’re curios) what matters the most in this example is the returned value from that read function and how those values are used in the following steps:

  • L14: We increase the WaitGroup by 1 each time we read a filename,
  • L16-27: For each filename we are supposed to read we launch a new goroutine,
    • L19: This new goroutine will read the values from the returned channel in read,
    • L17: Once the channel closes, we call Done in the WaitGroup to indicate that goroutine is finished
  • L30-33: A goroutine is launched to Wait for all other goroutines, this is calling the Wait method in the WaitGroup
    • L32: Once we are told all goroutines are finished we closed the returned channel ch, this is to indicate the caller that everything has completed
  • L35: We return a channel ch used as a way to indicate the original caller (main in this case) to wait until everything completes.

That code above works for sure, let’s see how this can be implemented using errgroup:

 1func main() {
 2	wait := errGroup()
 3	<-wait
 4}
 5
 6func errGroup() <-chan struct{} {
 7	ch := make(chan struct{}, 1)
 8
 9	var g errgroup.Group
10
11	for _, file := range []string{"file1.csv", "file2.csv", "file3.csv"} {
12		file := file
13
14		g.Go(func() error {
15			ch, err := read(file)
16			if err != nil {
17				return fmt.Errorf("error reading %w", err)
18			}
19
20			for line := range ch {
21				fmt.Println(line)
22			}
23
24			return nil
25		})
26	}
27
28	go func() {
29		if err := g.Wait(); err != nil {
30			fmt.Printf("Error reading files %v", err)
31		}
32
33		close(ch)
34	}()
35
36	return ch
37}

Compared to the WaitGroup example the biggest difference would be the fact that:

  • L9: We use errgroup.Group,
  • L14: goroutines are launched using the Go method from the errgroup.Group type and
  • L29: errgroup.Wait is the one used for waiting for the launched goroutines to completed.

Let’s look at another example that truly displays the power of using errgroup.

Using context.Context with errgroup

The errgroup package shines when context.Context is included, remember that context.Context is used for things like Cancellation and Error Propagation, combining this with errgroup you should be able to build complex programs that can react to errors triggered as part of the context.Context type, for example:

 1func main() {
 2	ctx := context.Background()
 3	wait := errGroup(ctx)
 4	<-wait
 5}
 6
 7func errGroup(ctx context.Context) <-chan struct{} {
 8	ch := make(chan struct{}, 1)
 9
10	g, ctx := errgroup.WithContext(ctx)
11
12	for _, file := range []string{"file1.csv", "file2.csv", "file3.csv"} {
13		file := file
14
15		g.Go(func() error {
16			ch, err := read(file)
17			if err != nil {
18				return fmt.Errorf("error reading %w", err)
19			}
20
21			for {
22				select {
23				case <-ctx.Done():
24					fmt.Printf("Context completed %v\n", ctx.Err())
25
26					return ctx.Err()
27				case line, ok := <-ch:
28					if !ok {
29						return nil
30					}
31
32					fmt.Println(line)
33				}
34			}
35		})
36	}
37
38	go func() {
39		if err := g.Wait(); err != nil {
40			fmt.Printf("Error reading files: %v", err)
41		}
42
43		close(ch)
44	}()
45
46	return ch
47}

This example is a modified version of what we had before when errgroup.Group was introduced, the key parts are:

  • L21-34: We use a for to select the whether the channels use use are still valid:
    • L23-26: Determines if the context variable (ctx) already finished, and
    • L27-32: Processes the received messaged from the channel, reacting to when it gets closed.

One real life example I implemented previously using this pattern was when I covered Implementing Complex Pipelines, it shows how all of this can come together when building complex programs using goroutines, feel free to read that series as well it should give you more details about a concrete and full example.

Conclusion

I’m a huge fan of errgroup because it simplifies the process of building programs that require synchronized work and it allows us to deal with errors that propagate between multiple goroutines.

If you’re looking to sink your teeth into more Go-related topics I recommend the following:


Back to posts