Introduction to Go concurrency with goroutines and channels

✍️ → Written on 2020-12-13 in 2719 words. Part of programming-languages golang

Motivation

Go (also called Golang) is a pragmatic programming language. Go is not praised for its sophisticated design, but its simplicity. Many aspects of the language just sum up the good parts of programming language design until 2009. However, two of the distinctive features are the go-keyword giving rise to the concept of goroutines and chan introducing channels. Those two primitives can be used to build a concurrency framework for your application.

  • goroutines are considered to be lightweight threads. A process can run multiple goroutines. Goroutines share their heap, but each has its own variable-sized stack. The scheduler is part of the golang runtime.

  • channels are communication primitives to share values across multiple concurrent units within one Go runtime. They are implemented with locks and a thread-safe queue. This enables Go’s idea called Don’t communicate by sharing memory; share memory by communicating.

Goroutines by example

Consider the following program which prints ‘running sub’:

package main

import (
	"fmt"
)

func sub() {
	fmt.Println("running sub")
}

func main() {
	sub()
}

If you use the go keyword before sub, the program becomes non-deterministic (→ broken example!):

package main

import (
	"fmt"
)

func sub() {
	fmt.Println("running sub")
}

func main() {
	go sub()
}

Why?

  1. main runs in the main-goroutine.

  2. If you call sub with go, another goroutine is created to run the sub function. The go function call is not blocked. So it does not wait for sub to return.

  3. Right after initiating creation of the goroutine, the main goroutine continues execution of main which is termination. As a result, the main function terminates. Since this is the main routine, the entire process stops and terminates other goroutines as well.

  4. If sub at this point already executes the fmt.Println call, then the output will be shown. If not, nothing will be printed.

In essence, this program is broken (yield non-determinism) because we do not synchronize both goroutines. We need to communicate when a goroutine terminates. In particular, the main routine needs to wait until the sub goroutine has terminated.

WaitGroup

The convenient building block for this usecase is sync.WaitGroup. In the following source code, we properly utilize a WaitGroup for synchronization:

package main

import (
	"fmt"
	"sync"
)

func sub(wg *sync.WaitGroup) {
	defer wg.Done()
	fmt.Println("running sub")
}

func main() {
	var wg sync.WaitGroup

	wg.Add(1)
	go sub(&wg)

	wg.Wait()
}
  1. We add one concurrent unit (i.e. increment the internal counter) to the wait group.

  2. We call sub asynchronously. Execution of the main-goroutine continues immediately.

  3. Wait() blocks until the WaitGroup’s internal counter reaches zero again.

  4. In the meanwhile, sub starts with printing the string running sub to stdout

  5. At exit (defer), wg.Done() is executed, decrementing the counter of the wait group.

In conclusion, main terminates only if sub has returned before. This program is deterministic.
We observe, that the function signature changed to introduce synchronization. Due to the need of synchronization, you cannot turn a synchronous function into an asynchronous one without modifications; in general.

Extensible WaitGroup

Go does not always make it easy to write extensible code, but the most important mechanism is the use of struct. We define a struct which can be extended for more fields in consecutive versions of the program. Thus, we put our WaitGroup into a struct, which can be extended for any other future attributes without changing the function signature:

package main

import (
	"fmt"
	"sync"
)

type ConcurrencyOptions struct {
	wg *sync.WaitGroup
}

func NewConcurrencyOptions() *ConcurrencyOptions {
	c := new(ConcurrencyOptions)
	c.wg = new(sync.WaitGroup)
	return c
}

func (c *ConcurrencyOptions) Inc() {
	c.wg.Add(1)
}

func (c *ConcurrencyOptions) Dec() {
	c.wg.Done()
}

func (c *ConcurrencyOptions) WaitForAll() {
	c.wg.Wait()
}

func sub(cOpts *ConcurrencyOptions) {
	defer cOpts.Dec()
	fmt.Println("running sub")
}

func main() {
	cOpts := NewConcurrencyOptions()

	cOpts.Inc()
	go sub(cOpts)

	cOpts.WaitForAll()
}

Scaled, extensible WaitGroup

The following example simply shows how to scale to multiple goroutines. This is trivial because of the design of WaitGroup's API as counter:

package main

import (
	"fmt"
	"sync"
)

type ConcurrencyOptions struct {
	wg    *sync.WaitGroup
	Units int
}

func NewConcurrencyOptions(numOfUnits int) *ConcurrencyOptions {
	c := new(ConcurrencyOptions)
	c.wg = new(sync.WaitGroup)
	c.Units = numOfUnits
	return c
}

func (c *ConcurrencyOptions) Inc() {
	c.wg.Add(1)
}

func (c *ConcurrencyOptions) Dec() {
	c.wg.Done()
}

func (c *ConcurrencyOptions) WaitForAll() {
	c.wg.Wait()
}

func sub(cOpts *ConcurrencyOptions) {
	defer cOpts.Dec()
	fmt.Println("running sub")
}

func main() {
	cOpts := NewConcurrencyOptions(5)

	for i := 0; i < cOpts.Units; i++ {
		cOpts.Inc()
		go sub(cOpts)
	}

	cOpts.WaitForAll()
}

It does not matter whether you increment WaitGroup’s internal ahead of the loop by n=5 or you increment it by 1 within the loop. But you must not put cOpts.Inc() inside the asynchronous function. Why? Because if we initiate creation of n goroutines, but none is executed yet, WaitGroup’s Wait() will return right away, because its internal counter is still zero.

Channels by example

chan keyword

WaitGroup are a useful mechanism for synchronization, but go channels are another one. WaitGroups can convey when a counter (usually, of concurrent units) reaches zero, but channels allow to convey more information. This synchronized example uses channels instead of a WaitGroup. The core of this example is that finishChan ← true sends the value true into the channel whereas ←finishChan ensure that code does not continue executing until some value is received from a channel.

package main

import (
	"fmt"
)

func sub(finishChan chan<- bool) {
	defer func() {
		finishChan <- true
	}()
	fmt.Println("running sub")
}

func main() {
	finishChan := make(chan bool)
	go sub(finishChan)

	<-finishChan
}

The use of true as value is completely arbitrary.

Extensible example with channels

Now we implement the extensible example from above with channels:

package main

import (
	"fmt"
)

type ConcurrencyOptions struct {
	finishChan chan bool
	Units      int
}

func NewConcurrencyOptions(numOfUnits int) *ConcurrencyOptions {
	c := new(ConcurrencyOptions)
	c.finishChan = make(chan bool)
	c.Units = numOfUnits
	return c
}

func (c *ConcurrencyOptions) Inc() {
}

func (c *ConcurrencyOptions) Dec() {
	c.finishChan <- true
}

func (c *ConcurrencyOptions) WaitForAll() {
	for i := 0; i < c.Units; i++ {
		<-c.finishChan
	}
}

func sub(cOpts *ConcurrencyOptions) {
	defer cOpts.Dec()
	fmt.Println("running sub")
}

func main() {
	cOpts := NewConcurrencyOptions(5)

	for i := 0; i < cOpts.Units; i++ {
		cOpts.Inc()
		go sub(cOpts)
	}

	cOpts.WaitForAll()
}

Buffered channels

Channels can also be buffered. This means channels are created with a finite number l of elements that can be queued in a channel. After l calls of c ← true, the number such instruction will block until at one element of the channel’s queue is read. I wonder which example might be best to illustrate this. The following example is one approach:

package main

import (
	"fmt"
	"sync"
	"time"
)

func produce(wg *sync.WaitGroup, c chan<- bool) {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		c <- true
		fmt.Println("produced")
	}
}

func consume(wg *sync.WaitGroup, c <-chan bool) {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		<-c
		fmt.Println("consumed")
		time.Sleep(30 * time.Millisecond)
	}
}

func main() {
	var wg sync.WaitGroup
	wg.Add(2)

	c := make(chan bool, 2)
	go produce(&wg, c)
	go consume(&wg, c)

	wg.Wait()
}
  • The function arguments use chan← TYPE to ensure that this channel is only used for receiving values. ←chan TYPE is used to ensure that this channel is only used for sending values.

  • The output will show produced twice first, followed by alternation between consumed and produced. Why? Because two values can be put into the channel until the first iteration is blocked at c ← true (→ twice).

  • The consume function is slower by design. It consumes one element per 30ms. Once, one element is consumed, the queue will be filled up by produce immediately again (→ alternation).

  • Since both functions are asynchronous, we need some synchronization primitive to wait until both of the terminate. A WaitGroup is used to achieve this.

Channels as iterators

The following example illustrates how we can implement iterators in Go. Using for result := range c, we iterate over the channel’s values. The open question is how does the channel know whether to expect one more element or we are finished? Channels can be closed. close(c) is used to close the channel which means that no more values can be sent through the channel. If you try to do so, you will get some inevitable panic: send on closed channel. It is not possible to test whether a channel is already closed or not.

package main

import (
	"fmt"
)

func produce(c chan<- int) {
	for i := 0; i < 10; i++ {
		c <- i
	}
	close(c)
}

func main() {
	c := make(chan int)
	go produce(c)

	for result := range c {
		fmt.Println("result =", result)
	}
}

Depending on your definition, this example does not show a true generator. It does not generate value-by-value synchronously with iteration, but fills the channel queue as fast as possible. We can achieve this using a buffered channel. Furthermore we move responsibility to create an appropriate channel into the function produce itself in the following improved version:

package main

import (
	"fmt"
)

func produce() <-chan int {
	c := make(chan int, 1)
	go func(c chan<- int) {
		defer close(c)
		for i := 0; i < 10; i++ {
			c <- i
		}
	}(c)
	return c
}

func main() {
	for result := range produce() {
		fmt.Println("result =", result)
	}
}

But what is the issue with iterators in Go in general? Because you return the values through a channel, your function signature looks different from just returning the elements in a slice. Furthermore, you need to manually specify that you close the channel to indicate finalization. Since this is not checked as part of the type system, runtime errors can happen easily.

So should you use a channel as iterator to return a large sequence of elements or just return the slice with all elements? Few APIs use the former and most APIs use the latter.

Advanced remarks

At this point, I don’t want to extend this post any further. I will keep remaining remarks brief:

  • Design inspiration

  • Typed channels

    • Here we only send bools through some channel. Channels can transport any type, but they are typed. So be sure to hide the actual type hidden from your public API to keep it extensible.

    • chan← TYPE as function argument means the channel is receiving-only (you can only receive values with it). ←chan TYPE means

  • Error handling

    • What happens if a channel has some error. How do you report it? Typically error handling means you want to communicate an error object to the main routine. Indeed for every channel, you can create an error channel of type chan error.

    • This makes one build your own abstraction which always maintains an error channel for some success channel. I recognized that such abstraction cannot be generically built in Go and will always be specific to your application. So you can reinvent the abstraction for every new application again.

  • Preemption

    • One fundamental question is whether you can terminate goroutines preemptively. The answer is no.

    • You can only repeatedly select on a termination channel and if a termination value is received, you return prematurely.

  • Admissible crashes

    • Let us assume your application uses many goroutines. And some are simply allowed to crash once in a while. Usually, a panic also terminates other goroutines. But using recover, you can prevent this behavior.

  • select

    • There might be several subroutines running concurrently producing results of various types. How can you receive the first result of any channel? Use the select keyword. A default case always matches. Thus a select on channels with a default case does not wait for the first received message, but executes the default case instead.

  • Number of values sent through the channel

    • Because closed channels cannot be used anymore and channels cannot be tested for closedness, you need to point out in your documentation whether (and when) channels are closed. Your API vastly differs depending on closing (or keeping open) a channel. The compiler cannot check compilance to these conventions.

    • Often you don’t know how many values will be sent through the channel. So you just create an infinite loop and fetch values from channels repeatedly.

    • Closing channels can come in handy. But be aware: selecting on a closed channel will return the zero value of the channel value’s type immediately.

    • Since closing channels is cumbersome, you might be better off considering the zero value as request for termination or create a second channel receiving success/failure messages from the previous step.

  • Concurrency patterns

    • Because the results (or errors) of the first step can be sent via channels to the second step, you can actually build pipelines. Execution of one step continues whenever the result of the previous step is ready. Since you can also forward results multiple times or filter them, the primitives become really powerful.

Conclusion

Concurrency in Go is nice, because it is built from some simple primitives. Furthermore, I am glad that Go decided to push concurrency support forward. However, the details show that no nice consistent design was achieved, which allows to cover popular usecases (e.g. a pipeline with proper error handling) easily.

  • Goroutines require a heavy runtime as part of every Go process.

  • A go keyword makes sense to start functions asynchronously. A separate, difficult-to-remember syntax for channels seems like a non-intuitive choice to me.

  • Channels can be found in other programming languages as well. However, the details still make it a distinctive feature.

  • Channels IMHO integrate badly with error handling in Go.

  • The relationship of channels as iterators is indecisive in the language. They are possible, but few APIs are designed around them.

In the end, concurrency has better support than in some previous languages, but it is still very easy to shoot yourself in the foot. One more exhaustive article discussing go channels is “Go channels are bad and you should feel bad” by JT Olio.