

Disclaimer: This post includes Amazon affiliate links. If you click on one of them and you make a purchase I’ll earn a commission. Please notice your final price is not affected at all by using those links.
Welcome to another post part of the series covering Concurrency Patterns in Go, this time I’m talking about the Background Job
pattern.
Because we are using concurrency and the primitives defined in the Go language, like Goroutines and Channels, I’m only focusing on things that occur in process and in memory so a Background Job
in this context means a process charge of doing some work behind the scenes, this process is initialized by another parent process, in practice it means a Goroutine launching Goroutines.
The code used for this post is available on Github.
This example covers a basic implementation of the Background Job
pattern using an os.Signal channel to receive events, the important bit in this example would be the listenForWork function, let’s look at it:
1func listenForWork() {
2 const workersN int = 5
3
4 sc := make(chan os.Signal, 1)
5 signal.Notify(sc, syscall.SIGTERM)
6
7 //-
8
9 workersC := make(chan struct{}, workersN)
10
11 // 1) Listen for messages to process
12 go func() {
13 for {
14 <-sc
15
16 workersC <- struct{}{} // 2) Send to processing channel
17 }
18 }()
19
20 go func() {
21 var workers int
22
23 for range workersC { // 3) Wait for messages to process
24 workerID := (workers % workersN) + 1
25 workers++
26
27 fmt.Printf("%d<-\n", workerID)
28
29 go func() { // 4) Process messages
30 doWork(workerID)
31 }()
32 }
33 }()
34}
TERM
signals, this will come into place in a few lines below.Background Jobs
.Next we have the two goroutines doing the work of listening for TERM
signals and processing those received events:
sc
channel, the signal events; this will send a new message to the workersC
channelworkersC
channel, using a for
, which in practice triggers the actual background job defined in the doWork function.Next example is a bit more complex, it includes a way to stop receiving events, it defines a buffer of received messages to process and a way to define a concrete number of Background Jobs
.
All of this is implemented by the Scheduler
type:
1type Scheduler struct {
2 workers int
3 msgC chan struct{}
4 signalC chan os.Signal
5 waitGroup sync.WaitGroup
6}
7
8func NewScheduler(workers, buffer int) *Scheduler {
9 return &Scheduler{
10 workers: workers,
11 msgC: make(chan struct{}, buffer),
12 signalC: make(chan os.Signal, 1),
13 }
14}
Where the fields indicate the following:
Background Jobs
to launch, in practice the amount of goroutines to execute.Background Jobs
to do some work with the received message,signalC
channel receives a message indicating the main process should exit.This Scheduler
types defines two methods, first ListenForWork
:
1func (s *Scheduler) ListenForWork() {
2 go func() { // 1) Listen for messages to process
3 signal.Notify(s.signalC, syscall.SIGTERM)
4
5 for {
6 <-s.signalC
7
8 s.msgC <- struct{}{} // 2) Send to processing channel
9 }
10 }()
11
12 s.waitGroup.Add(s.workers)
13
14 for i := 0; i < s.workers; i++ {
15 i := i
16 go func() {
17 for {
18 select {
19 case _, open := <-s.msgC: // 3) Wait for messages to process
20 if !open { // closed, exiting
21 fmt.Printf("%d closing\n", i+1)
22 s.waitGroup.Done()
23
24 return
25 }
26
27 fmt.Printf("%d<- Processing\n", i)
28 }
29 }
30 }()
31 }
32}
Notify
call and the receiving of those events are handled by the same goroutine.Background Jobs
to launch, this comes into to place in a few lines below.Notify
channel are processed by Background Jobs
, so:select
and an infinite loop we can determine if the msgC
channel was closed and call Done
for the WaitGroup to indicate it’s time to exitNext the Exit
method:
1func (s *Scheduler) Exit() {
2 close(s.msgC)
3 s.waitGroup.Wait()
4}
Closes the msgC
channel and waits until all goroutines complete.
Background Job
it’s a pattern that allows processes to define concurrent goroutines in charge of processing events via a messages channel, it’s a way to define concrete rules for defining amount of workers as well as the capacity or buffer we can support.
If you’re looking to sink your teeth into more Go-related topics I recommend the following: