

Disclaimer: This post includes Amazon affiliate links. Clicking on them earns me a commission and does not affect the final price.
Welcome to another post covering Concurrency Patterns in Go in Go 👋. This time, I am talking about the Publisher Subscriber pattern.
Also known as Pub/Sub, this messaging pattern allows customers, known as subscribers, to asynchronously receive events generated by a producer, known as a publisher. Pub/Sub is a well-known pattern used in distributed systems to asynchronously communicate different services interested in knowing specific information about a particular service.
In Go, we can use the concurrency primitives of the language, such as channels and goroutines, to implement this pattern. Let us start.
The code used for this post is available on GitHub.
This pattern can use interface{}
to support any type, but considering generics are already supported in Go, it makes more sense to use them to implement a type-safe version of this pattern.
The generic type PubSub
will implement the pattern; it requires three fields to be defined:
subscribers
: identifies the subscribers listening for events and is used to send the message when a new event is published,mu
: identifies a mutex, used to protect a block of code to avoid race conditions when multiple goroutines try to access it, andclosed
: indicates whether the PubSub is closed or not to determine if we can add new subscribers or publish events. 8type PubSub[T any] struct {
9 subscribers []chan T
10 mu sync.RWMutex
11 closed bool
12}
Notice how the mu
field is using sync.RWMutex, this is to explicitly acquire a Read/Write lock, depending on the operation we are trying to do. It will be explained better in the following sections when implementing the required methods for PubSub.
To complete the Pub/Sub pattern, the PubSub
type requires the implementation of the following three methods:
Subscribe
,Publish
, andClose
Subscribe
is used to allow new subscribers to receive published messages. It will update the subscribers field and return a new read channel that will be closed when PubSub
stops sending messages.
20func (s *PubSub[T]) Subscribe() <-chan T {
21 s.mu.Lock()
22 defer s.mu.Unlock()
23
24 if s.closed {
25 return nil
26 }
27
28 r := make(chan T)
29
30 s.subscribers = append(s.subscribers, r)
31
32 return r
33}
Notice how, in lines 21-22, the mutex mu
is blocking the method because an internal field in our type is updated, in this case, the subscribers field.
Publish
is used to send events to all internal subscribers.
35func (s *PubSub[T]) Publish(value T) {
36 s.mu.RLock()
37 defer s.mu.RUnlock()
38
39 if s.closed {
40 return
41 }
42
43 for _, ch := range s.subscribers {
44 ch <- value
45 }
46}
Notice how, in lines 36-37, the mutex mu
uses RLock
, the read version of the locking mechanism, because no internal fields in our type are updated, only read.
Finally, the Close
method indicates to the subscribers that no more messages will be published.
48func (s *PubSub[T]) Close() {
49 s.mu.Lock()
50 defer s.mu.Unlock()
51
52 if s.closed {
53 return
54 }
55
56 for _, ch := range s.subscribers {
57 close(ch)
58 }
59
60 s.closed = true
61}
Notice how, in lines 49-50, the mutex mu
is blocking the method because internal fields in our type are updated to represent the finalization of the communication between the publisher and the subscribers, meaning closing all channels and updating the closed field.
63func main() {
64 ps := NewPubSub[string]()
65
66 wg := sync.WaitGroup{}
67
68 //-
69
70 s1 := ps.Subscribe()
71
72 go func() {
73 wg.Add(1)
74
75 for {
76 select {
77 case val, ok := <-s1:
78 if !ok {
79 fmt.Print("sub 1, exiting")
80 wg.Done()
81 return
82 }
83
84 fmt.Println("sub 1, value ", val)
85 }
86 }
87 }()
88
89 //-
90
91 s2 := ps.Subscribe()
92
93 go func() {
94 wg.Add(1)
95
96 for val := range s2 {
97 fmt.Println("sub 2, value ", val)
98 }
99
100 wg.Done()
101
102 fmt.Print("sub 2, exiting")
103
104 }()
105
106 //-
107
108 ps.Publish("one")
109 ps.Publish("two")
110 ps.Publish("three")
111
112 ps.Close()
113
114 wg.Wait()
115
116 fmt.Println("completed")
117}
Both subscribers use the select statement in the for loop to read the values from the channel. What is a more straightforward way to do the same to remove the select statement?
Publisher/Subscriber is a concurrency pattern that allows in-process communication using the concurrency primitives of the language. An actual use case of this pattern would be to communicate to all running goroutines that it is time to exit, to allow them to clean up existing resources they are using.
If you’re looking to sink your teeth into more Go-related topics I recommend the following: