Disclaimer: This post includes Amazon affiliate links. If you click on one of them and you make a purchase I’ll earn a commission. Please notice your final price is not affected at all by using those links.
Welcome to another post part of the series covering Concurrency Patterns in Go, this time I’m talking about two similar patterns Fan In
and Fan Out
.
The code used for this post is available on Github.
What is the Fan-In pattern?
The Fan-In
pattern consists of consolidating of multiple channels into one channel by multiplexing each received value, the way it works is by merging messages from each receiving channel into one.
func main() {
ch1, err := read("file1.csv")
if err != nil {
panic(fmt.Errorf("Could not read file1 %v", err))
}
ch2, err := read("file2.csv")
if err != nil {
panic(fmt.Errorf("Could not read file2 %v", err))
}
//-
exit := make(chan struct{})
chM := merge2(ch1, ch2)
go func() {
for v := range chM {
fmt.Println(v)
}
close(exit)
}()
<-exit
fmt.Println("All completed, exiting")
}
This main
function may look like a lot but let’s break it apart:
- We have a
read
function that receives a CSV filename and returns a receiving channel, - Using that
read
function two files are read:file1.csv
andfile2.csv
, - We use those channels as arguments for the
merge2
function, which then returns another channel, - The received values in that channel are printed out in a goroutine,
- There’s a channel
exit
used only for making sure the anonymous goroutine completes reading all values, - This channel
exit
is closed and triggering a graceful shutdown.
The Fan-In
pattern is implemented in merge2
:
func merge2(cs ...<-chan []string) <-chan []string {
chans := len(cs)
wait := make(chan struct{}, chans)
out := make(chan []string)
send := func(c <-chan []string) {
defer func() { wait <- struct{}{} }()
for n := range c {
out <- n
}
}
for _, c := range cs {
go send(c)
}
go func() {
for range wait {
chans--
if chans == 0 {
break
}
}
close(out)
}()
return out
}
This is where the cool stuff happens, for each received channel value we launch a goroutine using the anonymous send
function, all of them send the messages to the out
channel, that is returned back to our initial call and then closed after the said channels are closed.
Notice we are also waiting a wait
channel used for determining when to close the out
channel, this channel is meant to receive N messages triggered by the defer
in send
; this is used as a way to indicate the channel is empty and that goroutine completed its work.
Another way to do something similar is by using the sync.WaitGroup
type, for example another way to something simlar would be like in merge1
something similar is done:
func merge1(cs ...<-chan []string) <-chan []string {
var wg sync.WaitGroup
out := make(chan []string)
send := func(c <-chan []string) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go send(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
What is the Fan-Out pattern?
The Fan-Out
patterns consists in breaking up one channel into multiple ones by distributing each value, the way it works is by returning channels that receive a value that comes from to original channel.
func main() {
ch1, err := read("file1.csv")
if err != nil {
panic(fmt.Errorf("Could not read file1 %v", err))
}
//-
br1 := breakup("1", ch1)
br2 := breakup("2", ch1)
for {
if br1 == nil && br2 == nil {
break
}
select {
case _, ok := <-br1:
if !ok {
br1 = nil
}
case _, ok := <-br2:
if !ok {
br2 = nil
}
}
}
fmt.Println("All completed, exiting")
}
This main
is a bit similar to what we did in Fan-Out:
- We have a
read
function that receives a CSV filename and returns a receiving channel, - This returned channel,
ch1
, is the one to be broken up, - For doing that we use
breakup
that returns another channel, - Finally using an infinite
foor
loop we wait until our channels are closed to exit.
breakup
is implemented like this:
func breakup(worker string, ch <-chan []string) chan struct{} {
chE := make(chan struct{})
go func() {
for v := range ch {
fmt.Println(worker, v)
}
close(chE)
}()
return chE
}
If you notice it’s literally printing out the received values, we could have done something else like augmenting or filtering those values; we could also rewrite this example to actual send those values back in the returned channel, that way the users of that channel can do something with the received values.
Conclusion
Fan-In
and Fan-Out
are two of the most common Concurrency Patterns in Go, depending on your problem you may find them useful, the important things to always keep in mind is a way to determine when the channels are closed and properly emit a message downstream to indicate that so they can react correctly.
Recommended reading
If you’re looking to sink your teeth into more Go-related topics I recommend the following:
- Get Programming with Go
- Learning Go: Context package
- Learning Go: Introduction to Concurrency Patterns
- Learning Go: Interface Types - Part 1
- Learning Go: Interface Types - Part 2