rss resume / curriculum vitae linkedin linkedin gitlab github twitter mastodon instagram
Learning Go: Publisher/Subscriber Concurrency Pattern
Nov 06, 2023

Disclaimer: This post includes Amazon affiliate links. Clicking on them earns me a commission and does not affect the final price.

Welcome to another post covering Concurrency Patterns in Go in Go 👋. This time, I am talking about the Publisher Subscriber pattern.



What is the Publisher/Subscriber pattern?

Also known as Pub/Sub, this messaging pattern allows customers, known as subscribers, to asynchronously receive events generated by a producer, known as a publisher. Pub/Sub is a well-known pattern used in distributed systems to asynchronously communicate different services interested in knowing specific information about a particular service.

In Go, we can use the concurrency primitives of the language, such as channels and goroutines, to implement this pattern. Let us start.

The code used for this post is available on GitHub.

Implementing the Publisher/Subscriber pattern

This pattern can use interface{} to support any type, but considering generics are already supported in Go, it makes more sense to use them to implement a type-safe version of this pattern.

The generic type PubSub will implement the pattern; it requires three fields to be defined:

  • subscribers: identifies the subscribers listening for events and is used to send the message when a new event is published,
  • mu: identifies a mutex, used to protect a block of code to avoid race conditions when multiple goroutines try to access it, and
  • closed: indicates whether the PubSub is closed or not to determine if we can add new subscribers or publish events.
 8type PubSub[T any] struct {
 9	subscribers []chan T
10	mu          sync.RWMutex
11	closed      bool
12}

Notice how the mu field is using sync.RWMutex, this is to explicitly acquire a Read/Write lock, depending on the operation we are trying to do. It will be explained better in the following sections when implementing the required methods for PubSub.

To complete the Pub/Sub pattern, the PubSub type requires the implementation of the following three methods:

  • Subscribe,
  • Publish, and
  • Close

Implementing Subscribe

Subscribe is used to allow new subscribers to receive published messages. It will update the subscribers field and return a new read channel that will be closed when PubSub stops sending messages.

20func (s *PubSub[T]) Subscribe() <-chan T {
21	s.mu.Lock()
22	defer s.mu.Unlock()
23
24	if s.closed {
25		return nil
26	}
27
28	r := make(chan T)
29
30	s.subscribers = append(s.subscribers, r)
31
32	return r
33}

Notice how, in lines 21-22, the mutex mu is blocking the method because an internal field in our type is updated, in this case, the subscribers field.

Implementing Publish

Publish is used to send events to all internal subscribers.

35func (s *PubSub[T]) Publish(value T) {
36	s.mu.RLock()
37	defer s.mu.RUnlock()
38
39	if s.closed {
40		return
41	}
42
43	for _, ch := range s.subscribers {
44		ch <- value
45	}
46}

Notice how, in lines 36-37, the mutex mu uses RLock, the read version of the locking mechanism, because no internal fields in our type are updated, only read.

Implementing Close

Finally, the Close method indicates to the subscribers that no more messages will be published.

48func (s *PubSub[T]) Close() {
49	s.mu.Lock()
50	defer s.mu.Unlock()
51
52	if s.closed {
53		return
54	}
55
56	for _, ch := range s.subscribers {
57		close(ch)
58	}
59
60	s.closed = true
61}

Notice how, in lines 49-50, the mutex mu is blocking the method because internal fields in our type are updated to represent the finalization of the communication between the publisher and the subscribers, meaning closing all channels and updating the closed field.

Example

 63func main() {
 64	ps := NewPubSub[string]()
 65
 66	wg := sync.WaitGroup{}
 67
 68	//-
 69
 70	s1 := ps.Subscribe()
 71
 72	go func() {
 73		wg.Add(1)
 74
 75		for {
 76			select {
 77			case val, ok := <-s1:
 78				if !ok {
 79					fmt.Print("sub 1, exiting")
 80					wg.Done()
 81					return
 82				}
 83
 84				fmt.Println("sub 1, value ", val)
 85			}
 86		}
 87	}()
 88
 89	//-
 90
 91	s2 := ps.Subscribe()
 92
 93	go func() {
 94		wg.Add(1)
 95
 96		for val := range s2 {
 97			fmt.Println("sub 2, value ", val)
 98		}
 99
100		wg.Done()
101
102		fmt.Print("sub 2, exiting")
103
104	}()
105
106	//-
107
108	ps.Publish("one")
109	ps.Publish("two")
110	ps.Publish("three")
111
112	ps.Close()
113
114	wg.Wait()
115
116	fmt.Println("completed")
117}
  • L66: Uses a WaitGroup to make sure all goroutines complete before main exits.
  • L70-87: Creates the first subscriber s1 and uses a goroutine to receive events.
  • L91-104: Creates the second subscriber, s2. In practice, the code is similar to the code used to create s1.
  • L108-110: Three messages are published
  • L112: Publisher is closed, completing the wait group, exiting the program.

Both subscribers use the select statement in the for loop to read the values from the channel. What is a more straightforward way to do the same to remove the select statement?

Conclusion

Publisher/Subscriber is a concurrency pattern that allows in-process communication using the concurrency primitives of the language. An actual use case of this pattern would be to communicate to all running goroutines that it is time to exit, to allow them to clean up existing resources they are using.

If you’re looking to sink your teeth into more Go-related topics I recommend the following:


Back to posts