Concurrency in Golang, zero to hero
Learn concurrency with GoRoutines, WaitGroups, Channels, Context with cancel function, and creating worker pools.
What is Concurrency?
Concurrency is an ability of a program to run multiple tasks which can run individually but remains part of the same program. Concurrency is important when we need to run an individual program without disturbing the original flow. In modern software, concurrency is required as the program needs to run fast and there could be multiple tasks that need to be done by the same program.
Concurrency in GoLang
Golang has a very powerful concurrency model called CSP (communicating sequential processes), which breaks a problem into smaller sequential processes and then schedules several instances of these processes called Goroutines. When we create a function as a goroutine it will be treated as an independent unit of work that gets scheduled and then executed on an available logical processor. CSP relies on using channels to pass the immutable messages between two or more concurrent processes.
GoRoutines:
It is a function that runs independently from the main thread. If we add go in front of a function it becomes a goroutine and gets executed in a different thread.
Channels:
It is a medium to send the message between the goroutines.
How do you achieve it?
Let’s take a look at a normal app with multiple calls in a loop
Normal App
package main
import (
"fmt"
"time"
)
func main() {
fmt.Printf("%s Start Program\n", getCurrentTime())
start := time.Now()
var data = []int{2000, 1000, 5000, 4000}
for _, d := range data {
processData(d)
}
elap := time.Since(start)
fmt.Printf("\n\n%s End Program, Took: %s \n", getCurrentTime(), elap)
}
func processData(inp int) {
fmt.Printf("%s Start processing for input %d \n", getCurrentTime(), inp)
start := time.Now()
time.Sleep(time.Duration(float32(inp)) * time.Millisecond)
elap := time.Since(start)
fmt.Printf("%s End processing for input %d, Took: %s \n", getCurrentTime(), inp, elap)
}
func getCurrentTime() string {
return time.Now().Format("2006-01-02T15:04:05Z")
}
Output
2021-03-16T21:04:04Z Start Program
2021-03-16T21:04:04Z Start processing for input 2000
2021-03-16T21:04:06Z End processing for input 2000, Took: 2.000172078s
2021-03-16T21:04:06Z Start processing for input 1000
2021-03-16T21:04:07Z End processing for input 1000, Took: 1.001002948s
2021-03-16T21:04:07Z Start processing for input 5000
2021-03-16T21:04:12Z End processing for input 5000, Took: 5.004036716s
2021-03-16T21:04:12Z Start processing for input 4000
2021-03-16T21:04:16Z End processing for input 4000, Took: 4.002003921s
2021-03-16T21:04:16Z End Program, Took: 12.007666943s
If I had separate threads for the same task it would obviously take less time, lets's use goroutines and check.
App with GoRoutines
package main
import (
"fmt"
"time"
)
func main() {
fmt.Printf("%s Start Program\n", getCurrentTime())
start := time.Now()
var data = []int{2000, 1000, 5000, 4000}
for _, d := range data {
go processData(d)
}
elap := time.Since(start)
fmt.Printf("\n\n%s End Program, Took: %s \n", getCurrentTime(), elap)
}
func processData(inp int) {
fmt.Printf("%s Start processing for input %d \n", getCurrentTime(), inp)
start := time.Now()
time.Sleep(time.Duration(float32(inp)) * time.Millisecond)
elap := time.Since(start)
fmt.Printf("%s End processing for input %d, Took: %s \n", getCurrentTime(), inp, elap)
}
func getCurrentTime() string {
return time.Now().UTC().Format("2006-01-02T15:04:05Z")
}
Output
2021-03-16T21:07:03Z Start Program
2021-03-16T21:07:03Z End Program, Took: 7.183µs
The problem here is I just spun multiple threads but didn’t wait for their results. So the main thread ended the program before other threads could complete their job.
To solve this let’s use wait groups to wait for all the threads to complete.
GoRoutines and WaitGroup
package main
import (
"fmt"
"sync"
"time"
)
func main() {
fmt.Printf("%s Start Program\n", getCurrentTime())
start := time.Now()
var data = []int{2000, 1000, 5000, 4000}
var wg sync.WaitGroup
for _, d := range data {
wg.Add(1)
go processData(d, &wg)
}
wg.Wait()
elap := time.Since(start)
fmt.Printf("\n\n%s End Program, Took: %s \n", getCurrentTime(), elap)
}
func processData(inp int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("%s Start processing for input %d \n", getCurrentTime(), inp)
start := time.Now()
time.Sleep(time.Duration(float32(inp)) * time.Millisecond)
elap := time.Since(start)
fmt.Printf("%s End processing for input %d, Took: %s \n", getCurrentTime(), inp, elap)
}
func getCurrentTime() string {
return time.Now().UTC().Format("2006-01-02T15:04:05Z")
}
Output
2021-03-16T15:29:08Z Start Program
2021-03-16T15:29:08Z Start processing for input 4000
2021-03-16T15:29:08Z Start processing for input 2000
2021-03-16T15:29:08Z Start processing for input 1000
2021-03-16T15:29:08Z Start processing for input 5000
2021-03-16T15:29:09Z End processing for input 1000, Took: 1.00473525s
2021-03-16T15:29:10Z End processing for input 2000, Took: 2.000104734s
2021-03-16T15:29:12Z End processing for input 4000, Took: 4.005124132s
2021-03-16T15:29:13Z End processing for input 5000, Took: 5.000640974s
2021-03-16T15:29:13Z End Program, Took: 5.000758315s
Here processData function now takes an argument for Waitgroup and will notify that it has done its task after completion. Added a waitgroup
on the main thread and added a job for each data in the loop. After the loop is completed waitgroup
waits for all the workers to complete and the count to go zero.
It significantly reduced the processing time from 12 seconds to 5 seconds here.
This method is okay when we have a small number of tasks to be done, but let’s say we have hundreds of records to process we can’t just spin a goroutine for each work, it will overload the system. To handle that we need to implement a worker pool so that at any point in time we only spin X number of goroutines.
Implementing Worker Pool
package main
import (
"fmt"
"sync"
"time"
)
func main() {
fmt.Printf("%s Start Program\n", getCurrentTime())
start := time.Now()
var data = []int{2000, 1000, 5000, 4000}
numberOfThreads := 2
var wg sync.WaitGroup
wg.Add(numberOfThreads)
var ch = make(chan int, len(data))
for i := 0; i < numberOfThreads; i++ {
go func(waitG *sync.WaitGroup, chn chan int, thread int) {
for {
d, ok := <-chn
if !ok {
fmt.Printf("Thread %d stopped \n", thread)
waitG.Done()
return
}
fmt.Printf("Thread %d \n", thread)
processData(d)
}
}(&wg, ch, i)
}
for _, d := range data {
ch <- d
}
close(ch)
wg.Wait()
elap := time.Since(start)
fmt.Printf("\n\n%s End Program, Took: %s \n", getCurrentTime(), elap)
}
func processData(inp int) {
fmt.Printf("%s Start processing for input %d \n", getCurrentTime(), inp)
start := time.Now()
time.Sleep(time.Duration(float32(inp)) * time.Millisecond)
elap := time.Since(start)
fmt.Printf("%s End processing for input %d, Took: %s \n", getCurrentTime(), inp, elap)
}
func getCurrentTime() string {
return time.Now().UTC().Format("2006-01-02T15:04:05Z")
}
Output
2021-03-16T16:14:00Z Start Program
Thread 1
2021-03-16T16:14:00Z Start processing for input 2000
Thread 0
2021-03-16T16:14:00Z Start processing for input 1000
2021-03-16T16:14:01Z End processing for input 1000, Took: 1.001528485s
Thread 0
2021-03-16T16:14:01Z Start processing for input 5000
2021-03-16T16:14:02Z End processing for input 2000, Took: 2.00099327s
Thread 1
2021-03-16T16:14:02Z Start processing for input 4000
2021-03-16T16:14:06Z End processing for input 4000, Took: 4.000337751s
Thread 1 stopped
2021-03-16T16:14:06Z End processing for input 5000, Took: 5.000158392s
Thread 0 stopped
2021-03-16T16:14:06Z End Program, Took: 6.002151567s
Here I have created a pool of 2 workers to do the job. I then created a channel
with the size of the data to process. Each worker will do the job until the message is received through the channel and upon completion of all the messages, it stops the worker.
I then send the data to process in a loop to the channel. After all the messages are sent I have closed the channel so that the check of message completion can be done in the worker pool. I have a waitgroup
to check that all the workers have completed their job. This way we can achieve concurrency with worker pools.
Now let’s say we need to stop the workers if there’s an error while processing any request and also we need to capture the response from the request we made.
Workerpool with receiver channel and context
package main
import (
"context"
"errors"
"fmt"
"time"
)
func worker(ctx context.Context, cancel context.CancelFunc, thread int, jobs <-chan int, res chan<- map[int]string, errs chan<- error) {
for d := range jobs {
select {
case <-ctx.Done():
fmt.Printf("Context cancel called thread: %d stopped \n", thread)
return
default:
fmt.Printf("Thread %d \n", thread)
rs, err := processData(d)
if err != nil {
fmt.Printf("\n Error received thread: %d data %d stopped \n", thread, d)
errs <- err
cancel()
return
}
res <- rs
}
}
}
func main() {
fmt.Printf("%s Start Program\n", getCurrentTime())
start := time.Now()
var data = []int{2000, 1000, 5000, 4000, 200, 800, 300, 400, 601, 606, 666}
numberOfThreads := 4
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var jobs = make(chan int, len(data))
var errs = make(chan error)
var res = make(chan map[int]string, len(data))
for i := 0; i < numberOfThreads; i++ {
go worker(ctx, cancel, i, jobs, res, errs)
}
for _, d := range data {
jobs <- d
}
close(jobs)
loop:
for j := 0; j < len(data); j++ {
select {
case err := <-errs:
fmt.Printf("error received %v", err)
break loop
case d := <-res:
fmt.Printf("Response received %+v", d)
}
}
elap := time.Since(start)
fmt.Printf("\n\n%s End Program, Took: %s \n", getCurrentTime(), elap)
}
func processData(inp int) (res map[int]string, err error) {
fmt.Printf("\n %s Start processing for input %d \n", getCurrentTime(), inp)
start := time.Now()
time.Sleep(time.Duration(float32(inp)) * time.Millisecond)
elap := time.Since(start)
fmt.Printf("\n %s End processing for input %d, Took: %s \n", getCurrentTime(), inp, elap)
res = map[int]string{inp: getCurrentTime()}
return
}
func processEvenData(inp int) (res map[int]string, err error) {
fmt.Printf("%s Start processing for input %d \n", getCurrentTime(), inp)
start := time.Now()
time.Sleep(time.Duration(float32(inp)) * time.Millisecond)
elap := time.Since(start)
if inp%2 == 0 {
res = map[int]string{inp: getCurrentTime()}
} else {
err = errors.New("failed to process data")
}
fmt.Printf("%s End processing for input %d, Took: %s \n", getCurrentTime(), inp, elap)
return
}
func getCurrentTime() string {
return time.Now().UTC().Format("2006-01-02T15:04:05Z")
}
Let’s look at the breakdown
var res = make(chan map[int]string, len(data))
This channel will be used to write the response.
ctx, cancel := context.WithCancel(context.Background())
Context with cancel function allows us to stop the work, we will use it to check if the stop is called from any worker, and if it’s called we stop that thread from processing further requests.
var errs = make(chan error)
This is an error channel that will be used to write error if any occurs.
select {
case <-ctx.Done():
}
This will check the context if there’s a cancel signal sent, if canceled it will stop the worker.
if err != nil {
cancel()
}
If there’s an error we call the cancel so that other workers will stop processing requests.
There are two different functions processData(inp int)
that will not send any error. Let’s check the response for it.
2021-07-16T05:50:13Z Start Program
Thread 1
2021-07-16T05:50:13Z Start processing for input 2000
Thread 0
2021-07-16T05:50:13Z Start processing for input 1000
2021-07-16T05:50:14Z End processing for input 1000, Took: 1.003946012s
Thread 0
2021-07-16T05:50:14Z Start processing for input 5000
Response received map[1000:2021-07-16T05:50:14Z]
2021-07-16T05:50:15Z End processing for input 2000, Took: 2.003974342s
Thread 1
2021-07-16T05:50:15Z Start processing for input 4001
Response received map[2000:2021-07-16T05:50:15Z]
2021-07-16T05:50:19Z End processing for input 5000, Took: 5.003718612s
Thread 0
2021-07-16T05:50:19Z Start processing for input 200
2021-07-16T05:50:19Z End processing for input 4001, Took: 4.003734684s
Response received map[5000:2021-07-16T05:50:19Z]Thread 1
Response received map[4001:2021-07-16T05:50:19Z]
2021-07-16T05:50:19Z Start processing for input 201
2021-07-16T05:50:20Z End processing for input 200, Took: 200.475834ms
Thread 0
2021-07-16T05:50:20Z Start processing for input 300
Response received map[200:2021-07-16T05:50:20Z]
2021-07-16T05:50:20Z End processing for input 201, Took: 201.073946ms
Thread 1
2021-07-16T05:50:20Z Start processing for input 400
Response received map[201:2021-07-16T05:50:20Z]
2021-07-16T05:50:20Z End processing for input 300, Took: 300.611346ms
Response received map[300:2021-07-16T05:50:20Z]
2021-07-16T05:50:20Z End processing for input 400, Took: 401.238171ms
Response received map[400:2021-07-16T05:50:20Z]
2021-07-16T05:50:20Z End Program, Took: 6.610424068s
Here all the records are processed with out error.
Now let’s use the other function processEvenData
which sends an error if the input is not an even number.
021-07-16T05:56:39Z Start Program
Thread 1
2021-07-16T05:56:39Z Start processing for input 2000
Thread 0
2021-07-16T05:56:39Z Start processing for input 1000
2021-07-16T05:56:40Z End processing for input 1000, Took: 1.004341994s
Thread 0
2021-07-16T05:56:40Z Start processing for input 5000
Response received map[1000:2021-07-16T05:56:40Z]2021-07-16T05:56:41Z End processing for input 2000, Took: 2.002803714s
Thread 1
2021-07-16T05:56:41Z Start processing for input 4001
Response received map[2000:2021-07-16T05:56:41Z]2021-07-16T05:56:45Z End processing for input 5000, Took: 5.000998866s
Thread 0
2021-07-16T05:56:45Z Start processing for input 200
Response received map[5000:2021-07-16T05:56:45Z]2021-07-16T05:56:45Z End processing for input 4001, Took: 4.002608371s
Error received thread: 1 data 4001 stopped
error received failed to process data
2021-07-16T05:56:45Z End Program, Took: 6.005715129s
Hereafter processing the odd number 4001, other tasks are not processed as it stops other workers as well.
Hope this makes things clear.