rss resume / curriculum vitae linkedin linkedin gitlab github twitter mastodon instagram
Learning Go: Fan-In/Fan-Out Concurrency Pattern
Aug 19, 2021

Disclaimer: This post includes Amazon affiliate links. If you click on one of them and you make a purchase I’ll earn a commission. Please notice your final price is not affected at all by using those links.

Welcome to another post part of the series covering Concurrency Patterns in Go, this time I’m talking about two similar patterns Fan In and Fan Out.



The code used for this post is available on Github.

What is the Fan-In pattern?

The Fan-In pattern consists of consolidating of multiple channels into one channel by multiplexing each received value, the way it works is by merging messages from each receiving channel into one.

What is Concurrency?

For example:

func main() {
	ch1, err := read("file1.csv")
	if err != nil {
		panic(fmt.Errorf("Could not read file1 %v", err))
	}

	ch2, err := read("file2.csv")
	if err != nil {
		panic(fmt.Errorf("Could not read file2 %v", err))
	}

	//-

	exit := make(chan struct{})

	chM := merge2(ch1, ch2)

	go func() {
		for v := range chM {
			fmt.Println(v)
		}

		close(exit)
	}()

	<-exit

	fmt.Println("All completed, exiting")
}

This main function may look like a lot but let’s break it apart:

  1. We have a read function that receives a CSV filename and returns a receiving channel,
  2. Using that read function two files are read: file1.csv and file2.csv,
  3. We use those channels as arguments for the merge2 function, which then returns another channel,
  4. The received values in that channel are printed out in a goroutine,
  5. There’s a channel exit used only for making sure the anonymous goroutine completes reading all values,
  6. This channel exit is closed and triggering a graceful shutdown.

The Fan-In pattern is implemented in merge2:

func merge2(cs ...<-chan []string) <-chan []string {
	chans := len(cs)
	wait := make(chan struct{}, chans)

	out := make(chan []string)

	send := func(c <-chan []string) {
		defer func() { wait <- struct{}{} }()

		for n := range c {
			out <- n
		}
	}

	for _, c := range cs {
		go send(c)
	}

	go func() {
		for range wait {
			chans--
			if chans == 0 {
				break
			}
		}

		close(out)
	}()

	return out
}

This is where the cool stuff happens, for each received channel value we launch a goroutine using the anonymous send function, all of them send the messages to the out channel, that is returned back to our initial call and then closed after the said channels are closed.

Notice we are also waiting a wait channel used for determining when to close the out channel, this channel is meant to receive N messages triggered by the defer in send; this is used as a way to indicate the channel is empty and that goroutine completed its work.

Another way to do something similar is by using the sync.WaitGroup type, for example another way to something simlar would be like in merge1 something similar is done:

func merge1(cs ...<-chan []string) <-chan []string {
	var wg sync.WaitGroup

	out := make(chan []string)

	send := func(c <-chan []string) {
		for n := range c {
			out <- n
		}

		wg.Done()
	}

	wg.Add(len(cs))

	for _, c := range cs {
		go send(c)
	}

	go func() {
		wg.Wait()

		close(out)
	}()

	return out
}

What is the Fan-Out pattern?

The Fan-Out patterns consists in breaking up one channel into multiple ones by distributing each value, the way it works is by returning channels that receive a value that comes from to original channel.

What is Concurrency?

For example:

func main() {
	ch1, err := read("file1.csv")
	if err != nil {
		panic(fmt.Errorf("Could not read file1 %v", err))
	}

	//-

	br1 := breakup("1", ch1)
	br2 := breakup("2", ch1)

	for {
		if br1 == nil && br2 == nil {
			break
		}

		select {
		case _, ok := <-br1:
			if !ok {
				br1 = nil
			}
		case _, ok := <-br2:
			if !ok {
				br2 = nil
			}
		}
	}

	fmt.Println("All completed, exiting")
}

This main is a bit similar to what we did in Fan-Out:

  1. We have a read function that receives a CSV filename and returns a receiving channel,
  2. This returned channel, ch1, is the one to be broken up,
  3. For doing that we use breakup that returns another channel,
  4. Finally using an infinite foor loop we wait until our channels are closed to exit.

breakup is implemented like this:

func breakup(worker string, ch <-chan []string) chan struct{} {
	chE := make(chan struct{})

	go func() {
		for v := range ch {
			fmt.Println(worker, v)
		}

		close(chE)
	}()

	return chE
}

If you notice it’s literally printing out the received values, we could have done something else like augmenting or filtering those values; we could also rewrite this example to actual send those values back in the returned channel, that way the users of that channel can do something with the received values.

Conclusion

Fan-In and Fan-Out are two of the most common Concurrency Patterns in Go, depending on your problem you may find them useful, the important things to always keep in mind is a way to determine when the channels are closed and properly emit a message downstream to indicate that so they can react correctly.

If you’re looking to sink your teeth into more Go-related topics I recommend the following:


Back to posts