Disclaimer: This post includes Amazon affiliate links. If you click on one of them and you make a purchase I’ll earn a commission. Please notice your final price is not affected at all by using those links.
Welcome to another post part of the series covering Concurrency Patterns in Go, this time I’m talking about the Background Job
pattern.
What is the Background Job pattern?
Because we are using concurrency and the primitives defined in the Go language, like Goroutines and Channels, I’m only focusing on things that occur in process and in memory so a Background Job
in this context means a process charge of doing some work behind the scenes, this process is initialized by another parent process, in practice it means a Goroutine launching Goroutines.
The code used for this post is available on Github.
Background Job Pattern Basic Example
This example covers a basic implementation of the Background Job
pattern using an os.Signal channel to receive events, the important bit in this example would be the listenForWork function, let’s look at it:
1func listenForWork() {
2 const workersN int = 5
3
4 sc := make(chan os.Signal, 1)
5 signal.Notify(sc, syscall.SIGTERM)
6
7 //-
8
9 workersC := make(chan struct{}, workersN)
10
11 // 1) Listen for messages to process
12 go func() {
13 for {
14 <-sc
15
16 workersC <- struct{}{} // 2) Send to processing channel
17 }
18 }()
19
20 go func() {
21 var workers int
22
23 for range workersC { // 3) Wait for messages to process
24 workerID := (workers % workersN) + 1
25 workers++
26
27 fmt.Printf("%d<-\n", workerID)
28
29 go func() { // 4) Process messages
30 doWork(workerID)
31 }()
32 }
33 }()
34}
- L4-5: Creates a channel for listening to signal events, specifically
TERM
signals, this will come into place in a few lines below. - L9: Creates a new buffered channel, this would be the actual channel used for triggering
Background Jobs
.
Next we have the two goroutines doing the work of listening for TERM
signals and processing those received events:
- L11-18: Goroutine listening for messages from the
sc
channel, the signal events; this will send a new message to theworkersC
channel - L20-33: Goroutine listening for messages coming from the
workersC
channel, using afor
, which in practice triggers the actual background job defined in the doWork function.
Background Job Pattern with Cancellation Example
Next example is a bit more complex, it includes a way to stop receiving events, it defines a buffer of received messages to process and a way to define a concrete number of Background Jobs
.
All of this is implemented by the Scheduler
type:
1type Scheduler struct {
2 workers int
3 msgC chan struct{}
4 signalC chan os.Signal
5 waitGroup sync.WaitGroup
6}
7
8func NewScheduler(workers, buffer int) *Scheduler {
9 return &Scheduler{
10 workers: workers,
11 msgC: make(chan struct{}, buffer),
12 signalC: make(chan os.Signal, 1),
13 }
14}
Where the fields indicate the following:
- workers: defines the number of
Background Jobs
to launch, in practice the amount of goroutines to execute. - msgC: defines the channel used for processing events, this channel will be used by the
Background Jobs
to do some work with the received message, - signalC: defines the signal channel used for graceful shutdown, and
- waitGroup: defines the WaitGroup meant to represent the total amount of workers, it’s used for making sure all goroutines exit after the
signalC
channel receives a message indicating the main process should exit.
This Scheduler
types defines two methods, first ListenForWork
:
1func (s *Scheduler) ListenForWork() {
2 go func() { // 1) Listen for messages to process
3 signal.Notify(s.signalC, syscall.SIGTERM)
4
5 for {
6 <-s.signalC
7
8 s.msgC <- struct{}{} // 2) Send to processing channel
9 }
10 }()
11
12 s.waitGroup.Add(s.workers)
13
14 for i := 0; i < s.workers; i++ {
15 i := i
16 go func() {
17 for {
18 select {
19 case _, open := <-s.msgC: // 3) Wait for messages to process
20 if !open { // closed, exiting
21 fmt.Printf("%d closing\n", i+1)
22 s.waitGroup.Done()
23
24 return
25 }
26
27 fmt.Printf("%d<- Processing\n", i)
28 }
29 }
30 }()
31 }
32}
- L2-10: Similar to the Basic example it’s used to listen for signal events, however in this case both the
Notify
call and the receiving of those events are handled by the same goroutine. - L12: Sets the number of
Background Jobs
to launch, this comes into to place in a few lines below. - L14-31: This is the part where the events received via the
Notify
channel are processed byBackground Jobs
, so:- L16: For each worker one goroutine is launched,
- L17-L19: Internally using
select
and an infinite loop we can determine if themsgC
channel was closed and callDone
for the WaitGroup to indicate it’s time to exit - L27: Otherwise we process the event and do something with it, in this case we print it out.
Next the Exit
method:
1func (s *Scheduler) Exit() {
2 close(s.msgC)
3 s.waitGroup.Wait()
4}
Closes the msgC
channel and waits until all goroutines complete.
Conclusion
Background Job
it’s a pattern that allows processes to define concurrent goroutines in charge of processing events via a messages channel, it’s a way to define concrete rules for defining amount of workers as well as the capacity or buffer we can support.
Recommended reading
If you’re looking to sink your teeth into more Go-related topics I recommend the following:
- Get Programming with Go
- Learning Go: Introduction to Concurrency Patterns
- Learning Go: Fan-In and Fan-Out Concurrency Pattern
- Learning Go: Concurrency Patterns using errgroup package
- Learning Go: Pipeline Concurrency Pattern