Signup/Sign In
PUBLISHED ON: FEBRUARY 17, 2023

Concurrency in Go

The concurrency patterns of Go is based on the concept of Goroutines, which are lightweight threads and pipelines used to communicate between goroutines. Together, Routines and Channels provide a powerful tool for building concurrent and parallel systems in Golang.

One way to leverage Concurrency model in Go is to understand and use common concurrency models. These models are often used to solve specific problems in concurrent programming, such as coordinating and synchronizing routines, how to process data in parallel, and how to manage shared state.

In this article, we'll explore several common concurrency patterns in Go, including:

  • Pipelines
  • Fan-out
  • Fan-in
  • Select

Concurrency Patterns

Pipelines

A pipeline is a model in which a series of goroutines are connected by channels and each goroutine performs a specific task. Data flows through the pipeline, with each goroutine reading from one pipeline and writing to another.

A simple example of a pipeline pattern is a pipeline that reads from a source, does some processing, and writes to a sink.

package main

import "fmt"

func main() {
	// Set up the pipeline.
	c := gen(2, 3)
	out := sq(c)

	// Consume the output.
	fmt.Println(<-out) // 4
	fmt.Println(<-out) // 9
}

// Generate integers.
func gen(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

// Square integers.
func sq(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()
	return out
}

This example creates a pipeline with two simple steps: a "gen" step that generates integers, and a "sq" step that squares those integers. The pipeline is set up in the main function, where a pipeline "c" is created by calling the "gen" function and passing it some integers. This channel is then passed to the "sq" function, which squares the integers and returns a new "out" channel.

The pipeline is then started by calling the gen() function, which starts generating integers and sends them down the c-pipe. The sq() function receives those integers and performs the square operation, the result is then sent down an output channel.

The main function consumes the pipeline output by reading from the "out" channel and printing the result.

Fan-out

Fan-out is a pattern where a single routine sends data to several other routines. This model can be used to perform the same operation on multiple data in parallel.

An example of a display pattern is a program that reads a large number of files and counts the occurrences of a word in each file. The program creates a goroutine for each file that reads the file and sends the word count to a shared channel:

package main

import (
	"fmt"
	"io/ioutil"
	"strings"
	"sync"
)

func main() {
	var wg sync.WaitGroup

	files := []string{"file1.txt", "file2.txt", "file3.txt"}
	word := "example"
	counts := make(map[string]int)

	for _, file := range files {
		wg.Add(1)
		go func(file string) {
			defer wg.Done()
			data, _ := ioutil.ReadFile(file)
			count := strings.Count(string(data), word)
			counts[file] = count
		}(file)
	}
	wg.Wait()
	for file, count := range counts {
		fmt.Printf("%d occurrences of %s in %s\n", count, word, file)
	}
}

This example of a fan-out pattern in action, the program reads a list of filenames, builds a goroutine for each file, and counts the occurrences of an "example" word in each file. Each routine sends the word count to the "counts" channel.

A wait pool is used to ensure that the program waits for all routines to complete before exiting. A goroutine is created to call WaitGroup.Wait() and close the count pipe, this goroutine is executed only when all other goroutines are ready.

Finally, the program reads from the "count" channel and prints the results.

Fan-in

Fan-in is the opposite of fan-out, it is a scheme where multiple goroutines send data to a single goroutine. This pattern can be used to merge multiple data streams into a single stream.

An example of a fan-in model is a program that reads data from multiple sources and sends it to a single routine that processes the data and writes it to a file:

package main

import (
	"fmt"
	"io"
	"os"
	"sync"
)

func main() {
	var wg sync.WaitGroup

	var sources = []string{"source1.txt", "source2.txt", "source3.txt"}
	var dest = "dest.txt"

	file, _ := os.Create(dest)
	defer file.Close()
	wg.Add(len(sources))

	for _, source := range sources {
		go func(source string) {
			data, err := os.Open(source)
			if err != nil {
				fmt.Println(err)
				return
			}
			defer data.Close()
			_, err = io.Copy(file, data)
			if err != nil {
				fmt.Println(err)
				return
			}
			wg.Done()
		}(source)
	}

	wg.Wait()
}

This example shows the fan-in pattern in action, the program reads a list of filenames, creates a go routine for each file, and reads data from each file, then sends that data to a single goroutine which writes the data to a single "dest.txt" file. A wait pool is used to ensure that the program waits for all routines to complete before exiting. In this case, the Wait() method of WaitGroup is called twice, the first time it is called in a goroutine that waits for all other goroutines to complete and then closes the file, then the second time it is called in the main waiting function of all routines to complete before exiting the program.

Select

The select statement is a powerful tool in the Go concurrency model that allows go routines to wait on multiple channels and choose the first one that is ready. It can be used to implement different concurrency models, such as fan-outs, fan-ins, and even pipelines.

An example of using the select statement to implement a view model where a single routine sends data to multiple routines.

package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	c1 := make(chan int)
	c2 := make(chan int)
	c3 := make(chan int)

	wg.Add(3)
	go func() {
		defer wg.Done()
		for i := 0; i < 10; i++ {
			select {
			case c1 <- i:
			case c2 <- i:
			case c3 <- i:
			}
		}
	}()

	go func() {
		defer wg.Done()
		for v := range c1 {
			fmt.Println("c1:", v)
		}
	}()

	go func() {
		defer wg.Done()
		for v := range c2 {
			fmt.Println("c2:", v)
		}
	}()

	go func() {
		defer wg.Done()
		for v := range c3 {
			fmt.Println("c3:", v)
		}
	}()
	wg.Wait()
}

The program creates three channels and starts a single goroutine which sends integers to one of the channels using the select statement. This routine uses a for loop to send integers to one of the words, c1, c2 or c3. The select statement blocks until one of the case statements is ready, that is, one of the channels is ready to accept a value. In this example, all channels are always ready to accept values, so any channel can receive a value.

Three more routines are started which consume the data from the channels and print the value they have received. Using WaitGroup, the program ensures that all goroutines are executed before exiting.

Conclusion

Concurrency patterns are a powerful tool for building concurrent and parallel systems in Go programs. They provide a way to solve specific problems in concurrent programming, such as how to coordinate and synchronize goroutines, how to process data in parallel, and how to handle state shared. This article introduced you to some common concurrency patterns in Golang, including pipeline, fanout, fanout, and the select statement. By understanding and using these patterns, you can write more efficient and maintainable code.



About the author:
Pradeep has expertise in Linux, Go, Nginx, Apache, CyberSecurity, AppSec and various other technical areas. He has contributed to numerous publications and websites, providing his readers with insightful and informative content.