rss twitter gitlab github linkedin linkedin instagram
Learning Go: Background Job Concurrency Pattern
Oct 01, 2021

Disclaimer: This post includes Amazon affiliate links. If you click on one of them and you make a purchase I’ll earn a commission. Please notice your final price is not affected at all by using those links.

Welcome to another post part of the series covering Concurrency Patterns in Go, this time I’m talking about the Background Job pattern.



What is the Background Job pattern?

Because we are using concurrency and the primitives defined in the Go language, like Goroutines and Channels, I’m only focusing on things that occur in process and in memory so a Background Job in this context means a process charge of doing some work behind the scenes, this process is initialized by another parent process, in practice it means a Goroutine launching Goroutines.

The code used for this post is available on Github.

Background Job Pattern Basic Example

This example covers a basic implementation of the Background Job pattern using an os.Signal channel to receive events, the important bit in this example would be the listenForWork function, let’s look at it:

 1func listenForWork() {
 2	const workersN int = 5
 3
 4	sc := make(chan os.Signal, 1)
 5	signal.Notify(sc, syscall.SIGTERM)
 6
 7	//-
 8
 9	workersC := make(chan struct{}, workersN)
10
11	// 1) Listen for messages to process
12	go func() {
13		for {
14			<-sc
15
16			workersC <- struct{}{} // 2) Send to processing channel
17		}
18	}()
19
20	go func() {
21		var workers int
22
23		for range workersC { // 3) Wait for messages to process
24			workerID := (workers % workersN) + 1
25			workers++
26
27			fmt.Printf("%d<-\n", workerID)
28
29			go func() { // 4) Process messages
30				doWork(workerID)
31			}()
32		}
33	}()
34}
  • L4-5: Creates a channel for listening to signal events, specifically TERM signals, this will come into place in a few lines below.
  • L9: Creates a new buffered channel, this would be the actual channel used for triggering Background Jobs.

Next we have the two goroutines doing the work of listening for TERM signals and processing those received events:

  • L11-18: Goroutine listening for messages from the sc channel, the signal events; this will send a new message to the workersC channel
  • L20-33: Goroutine listening for messages coming from the workersC channel, using a for, which in practice triggers the actual background job defined in the doWork function.

Background Job Pattern with Cancellation Example

Next example is a bit more complex, it includes a way to stop receiving events, it defines a buffer of received messages to process and a way to define a concrete number of Background Jobs.

All of this is implemented by the Scheduler type:

 1type Scheduler struct {
 2	workers   int
 3	msgC      chan struct{}
 4	signalC   chan os.Signal
 5	waitGroup sync.WaitGroup
 6}
 7
 8func NewScheduler(workers, buffer int) *Scheduler {
 9	return &Scheduler{
10		workers: workers,
11		msgC:    make(chan struct{}, buffer),
12		signalC: make(chan os.Signal, 1),
13	}
14}

Where the fields indicate the following:

  • workers: defines the number of Background Jobs to launch, in practice the amount of goroutines to execute.
  • msgC: defines the channel used for processing events, this channel will be used by the Background Jobs to do some work with the received message,
  • signalC: defines the signal channel used for graceful shutdown, and
  • waitGroup: defines the WaitGroup meant to represent the total amount of workers, it’s used for making sure all goroutines exit after the signalC channel receives a message indicating the main process should exit.

This Scheduler types defines two methods, first ListenForWork:

 1func (s *Scheduler) ListenForWork() {
 2	go func() { // 1) Listen for messages to process
 3		signal.Notify(s.signalC, syscall.SIGTERM)
 4
 5		for {
 6			<-s.signalC
 7
 8			s.msgC <- struct{}{} // 2) Send to processing channel
 9		}
10	}()
11
12	s.waitGroup.Add(s.workers)
13
14	for i := 0; i < s.workers; i++ {
15		i := i
16		go func() {
17			for {
18				select {
19				case _, open := <-s.msgC: // 3) Wait for messages to process
20					if !open { // closed, exiting
21						fmt.Printf("%d closing\n", i+1)
22						s.waitGroup.Done()
23
24						return
25					}
26
27					fmt.Printf("%d<- Processing\n", i)
28				}
29			}
30		}()
31	}
32}
  • L2-10: Similar to the Basic example it’s used to listen for signal events, however in this case both the Notify call and the receiving of those events are handled by the same goroutine.
  • L12: Sets the number of Background Jobs to launch, this comes into to place in a few lines below.
  • L14-31: This is the part where the events received via the Notify channel are processed by Background Jobs, so:
    • L16: For each worker one goroutine is launched,
    • L17-L19: Internally using select and an infinite loop we can determine if the msgC channel was closed and call Done for the WaitGroup to indicate it’s time to exit
    • L27: Otherwise we process the event and do something with it, in this case we print it out.

Next the Exit method:

1func (s *Scheduler) Exit() {
2	close(s.msgC)
3	s.waitGroup.Wait()
4}

Closes the msgC channel and waits until all goroutines complete.

Conclusion

Background Job it’s a pattern that allows processes to define concurrent goroutines in charge of processing events via a messages channel, it’s a way to define concrete rules for defining amount of workers as well as the capacity or buffer we can support.

If you’re looking to sink your teeth into more Go-related topics I recommend the following:


Back to posts