Producer-Consumer in Go

The Go programming language has become the most popular language in the modern cloud infrastructure stack. For Java developers, it is easy to get started with Go because both languages share a lot of similarities. However, sometimes it is challenging to find a good way to express something in Go.

The best way to learn a programming language is to read code. I recently came across golang.org/x/exp/winfsnotify and found an interesting piece of code that I would like to share in this Blog post.

Producer-Consumer in Go

The producer-consumer pattern with a single producer and a single consumer is one of the most simple patterns in parallel computing. In Go, you can implement it like this:

package main

import (
    "fmt"
    "time"
)

func calculateNextInt(prev int) int {
    time.Sleep(1 * time.Second) // pretend this is an expensive operation
    return prev + 1
}

func main() {
    data := make(chan int)

    // producer
    go func() {
        var i = 0
        for {
            i = calculateNextInt(i)
            data <- i
        }
    }()

    // consumer
    for i := range data {
        fmt.Printf("i=%v\n", i)
    }
}

Go’s channels and goroutines allow for a simple and straightforward implementation. In real world applications you might make an additional error channel to allow the producer send errors to the consumer, but we leave this aside for now.

What if we want to stop the producer loop?

The golden rule of Go channels is: Channels should be closed by the goroutine writing into the channel, and not by the goroutine reading from the channel. Go enforces this by making a program panic if a goroutine tries to write into a closed channel, and gracefully returning nil when a goroutine reads from a closed channel.

What we need is a way to signal to the consumer loop that it sould terminate and close the channel. A common way to do this is to create an additional channel for that signal. We call that channel quit. The modified main() function looks like this:

func main() {
    data := make(chan int)
    quit := make(chan interface{})

    // producer
    go func() {
        var i = 0
        for {
            i = calculateNextInt(i)
            select {
            case data <- i:
            case <-quit:
                close(data)
                return
            }
        }
    }()

    // consumer
    for i := range data {
        fmt.Printf("i=%v\n", i)
        if i >= 5 {
            close(quit)
        }
    }
}

After the consumer closed the quit channel, the producer will read nil from quit, close the data channel and terminate.

While this is a good solution for most scenarios, it has one drawback: Closing the producer is an asynchronous fire-and-forget operation. After the consumer closes the quit channel there is no way to know when the producer is actually stopped. This is a problem if the producer holds system resources and the consumer needs to wait until these resources are free.

Implementing a Synchronous Close() function

Our goal is to implement a Close() function for the producer as follows:

  • Synchronous operation: When Close() returns, the producer is actually terminated.
  • Error handling: When the producer fails to shut down cleanly, Close() returns an error.

The solution I came accross and that I want to share in this Blog post is to create a channel of channels:

type producer struct {
    data chan int
    quit chan chan error
}

func (p *producer) Close() error {
    ch := make(chan error)
    p.quit <- ch
    return <-ch
}

func main() {
    prod := &producer{
        data: make(chan int),
        quit: make(chan chan error),
    }

    // producer
    go func() {
        var i = 0
        for {
            i = calculateNextInt(i)
            select {
            case prod.data <- i:
            case ch := <-prod.quit:
                close(prod.data)
                // If the producer had an error while shutting down,
                // we could write the error to the ch channel here.
                close(ch)
                return
            }
        }
    }()

    // consumer
    for i := range prod.data {
        fmt.Printf("i=%v\n", i)
        if i >= 5 {
            err := prod.Close()
            if err != nil {
                // cannot happen in this example
                fmt.Printf("unexpected error: %v\n", err)
            }
        }
    }
}

The Close() function creates a temporary channel ch that is used by the producer to signal when shutdown is complete and if there was an error during shutdown.

Where to go from here

In this Blog post, we showed how to implement synchronous shutdown of a producer gorouting in Go. One thing we left out is how to interrupt the actual work of the producer, in our case simulated by the calculateNextInt() function. This is highly application specific. Some operations can be interrupted by closing a file handle, some by sending a signal. You need to know what your producer is doing to come up with a way to interrupt that operation.

Start your FREE TRIAL today!

Full Stack Visibility in 5 minutes! Instana makes it easy to
manage the performance of your applications.

Free Trial

See how Instana's technology works!

Maximum Understanding, Minimum Effort! Check out
how Instana can help you managing your applications.

How Instana Works