Concurrency in Golang, zero to hero

Learn concurrency with GoRoutines, WaitGroups, Channels, Context with cancel function, and creating worker pools.

Concurrency in Golang, zero to hero

What is Concurrency?

Concurrency is an ability of a program to run multiple tasks which can run individually but remains part of the same program. Concurrency is important when we need to run an individual program without disturbing the original flow. In modern software, concurrency is required as the program needs to run fast and there could be multiple tasks that need to be done by the same program.

Concurrency in GoLang

Golang has a very powerful concurrency model called CSP (communicating sequential processes), which breaks a problem into smaller sequential processes and then schedules several instances of these processes called Goroutines. When we create a function as a goroutine it will be treated as an independent unit of work that gets scheduled and then executed on an available logical processor. CSP relies on using channels to pass the immutable messages between two or more concurrent processes.

GoRoutines:

It is a function that runs independently from the main thread. If we add go in front of a function it becomes a goroutine and gets executed in a different thread.

Channels:

It is a medium to send the message between the goroutines.

How do you achieve it?

Let’s take a look at a normal app with multiple calls in a loop

Normal App

package main

import (
    "fmt"
    "time"
)

func main() {
    fmt.Printf("%s Start Program\n", getCurrentTime())
    start := time.Now()
    var data = []int{2000, 1000, 5000, 4000}

    for _, d := range data {
        processData(d)
    }
    elap := time.Since(start)
    fmt.Printf("\n\n%s End Program, Took: %s \n", getCurrentTime(), elap)
}

func processData(inp int) {
    fmt.Printf("%s Start processing for input %d \n", getCurrentTime(), inp)
    start := time.Now()
    time.Sleep(time.Duration(float32(inp)) * time.Millisecond)
    elap := time.Since(start)
    fmt.Printf("%s End processing for input %d, Took: %s \n", getCurrentTime(), inp, elap)
}

func getCurrentTime() string {
    return time.Now().Format("2006-01-02T15:04:05Z")
}

Output

2021-03-16T21:04:04Z Start Program
2021-03-16T21:04:04Z Start processing for input 2000 
2021-03-16T21:04:06Z End processing for input 2000, Took: 2.000172078s 
2021-03-16T21:04:06Z Start processing for input 1000 
2021-03-16T21:04:07Z End processing for input 1000, Took: 1.001002948s 
2021-03-16T21:04:07Z Start processing for input 5000 
2021-03-16T21:04:12Z End processing for input 5000, Took: 5.004036716s 
2021-03-16T21:04:12Z Start processing for input 4000 
2021-03-16T21:04:16Z End processing for input 4000, Took: 4.002003921s 


2021-03-16T21:04:16Z End Program, Took: 12.007666943s

If I had separate threads for the same task it would obviously take less time, lets's use goroutines and check.

App with GoRoutines

package main

import (
    "fmt"
    "time"
)

func main() {
    fmt.Printf("%s Start Program\n", getCurrentTime())
    start := time.Now()
    var data = []int{2000, 1000, 5000, 4000}

    for _, d := range data {
        go processData(d)
    }
    elap := time.Since(start)
    fmt.Printf("\n\n%s End Program, Took: %s \n", getCurrentTime(), elap)
}

func processData(inp int) {
    fmt.Printf("%s Start processing for input %d \n", getCurrentTime(), inp)
    start := time.Now()
    time.Sleep(time.Duration(float32(inp)) * time.Millisecond)
    elap := time.Since(start)
    fmt.Printf("%s End processing for input %d, Took: %s \n", getCurrentTime(), inp, elap)
}

func getCurrentTime() string {
    return time.Now().UTC().Format("2006-01-02T15:04:05Z")
}

Output

2021-03-16T21:07:03Z Start Program


2021-03-16T21:07:03Z End Program, Took: 7.183µs

The problem here is I just spun multiple threads but didn’t wait for their results. So the main thread ended the program before other threads could complete their job.

To solve this let’s use wait groups to wait for all the threads to complete.

GoRoutines and WaitGroup

package main

import (
   "fmt"
   "sync"
   "time"
)

func main() {
   fmt.Printf("%s Start Program\n", getCurrentTime())
   start := time.Now()
   var data = []int{2000, 1000, 5000, 4000}
   var wg sync.WaitGroup

   for _, d := range data {
       wg.Add(1)
       go processData(d, &wg)
   }
   wg.Wait()
   elap := time.Since(start)
   fmt.Printf("\n\n%s End Program, Took: %s \n", getCurrentTime(), elap)
}

func processData(inp int, wg *sync.WaitGroup) {
   defer wg.Done()
   fmt.Printf("%s Start processing for input %d \n", getCurrentTime(), inp)
   start := time.Now()
   time.Sleep(time.Duration(float32(inp)) * time.Millisecond)
   elap := time.Since(start)
   fmt.Printf("%s End processing for input %d, Took: %s \n", getCurrentTime(), inp, elap)
}

func getCurrentTime() string {
   return time.Now().UTC().Format("2006-01-02T15:04:05Z")
}

Output

2021-03-16T15:29:08Z Start Program
2021-03-16T15:29:08Z Start processing for input 4000 
2021-03-16T15:29:08Z Start processing for input 2000 
2021-03-16T15:29:08Z Start processing for input 1000 
2021-03-16T15:29:08Z Start processing for input 5000 
2021-03-16T15:29:09Z End processing for input 1000, Took: 1.00473525s 
2021-03-16T15:29:10Z End processing for input 2000, Took: 2.000104734s 
2021-03-16T15:29:12Z End processing for input 4000, Took: 4.005124132s 
2021-03-16T15:29:13Z End processing for input 5000, Took: 5.000640974s 


2021-03-16T15:29:13Z End Program, Took: 5.000758315s

Here processData function now takes an argument for Waitgroup and will notify that it has done its task after completion. Added a waitgroup on the main thread and added a job for each data in the loop. After the loop is completed waitgroup waits for all the workers to complete and the count to go zero.

It significantly reduced the processing time from 12 seconds to 5 seconds here.

This method is okay when we have a small number of tasks to be done, but let’s say we have hundreds of records to process we can’t just spin a goroutine for each work, it will overload the system. To handle that we need to implement a worker pool so that at any point in time we only spin X number of goroutines.

Implementing Worker Pool

package main

import (
   "fmt"
   "sync"
   "time"
)

func main() {
   fmt.Printf("%s Start Program\n", getCurrentTime())
   start := time.Now()
   var data = []int{2000, 1000, 5000, 4000}

   numberOfThreads := 2
   var wg sync.WaitGroup
   wg.Add(numberOfThreads)

   var ch = make(chan int, len(data))
   for i := 0; i < numberOfThreads; i++ {
       go func(waitG *sync.WaitGroup, chn chan int, thread int) {
           for {

               d, ok := <-chn
               if !ok {
                   fmt.Printf("Thread %d stopped \n", thread)
                   waitG.Done()
                   return
               }
               fmt.Printf("Thread %d \n", thread)
               processData(d)
           }
       }(&wg, ch, i)
   }

   for _, d := range data {
       ch <- d
   }
   close(ch)
   wg.Wait()

   elap := time.Since(start)
   fmt.Printf("\n\n%s End Program, Took: %s \n", getCurrentTime(), elap)
}

func processData(inp int) {
   fmt.Printf("%s Start processing for input %d \n", getCurrentTime(), inp)
   start := time.Now()
   time.Sleep(time.Duration(float32(inp)) * time.Millisecond)
   elap := time.Since(start)
   fmt.Printf("%s End processing for input %d, Took: %s \n", getCurrentTime(), inp, elap)
}

func getCurrentTime() string {
   return time.Now().UTC().Format("2006-01-02T15:04:05Z")
}

Output

2021-03-16T16:14:00Z Start Program
Thread 1 
2021-03-16T16:14:00Z Start processing for input 2000 
Thread 0 
2021-03-16T16:14:00Z Start processing for input 1000 
2021-03-16T16:14:01Z End processing for input 1000, Took: 1.001528485s 
Thread 0 
2021-03-16T16:14:01Z Start processing for input 5000 
2021-03-16T16:14:02Z End processing for input 2000, Took: 2.00099327s 
Thread 1 
2021-03-16T16:14:02Z Start processing for input 4000 
2021-03-16T16:14:06Z End processing for input 4000, Took: 4.000337751s 
Thread 1 stopped 
2021-03-16T16:14:06Z End processing for input 5000, Took: 5.000158392s 
Thread 0 stopped 


2021-03-16T16:14:06Z End Program, Took: 6.002151567s

Here I have created a pool of 2 workers to do the job. I then created a channel with the size of the data to process. Each worker will do the job until the message is received through the channel and upon completion of all the messages, it stops the worker.

I then send the data to process in a loop to the channel. After all the messages are sent I have closed the channel so that the check of message completion can be done in the worker pool. I have a waitgroup to check that all the workers have completed their job. This way we can achieve concurrency with worker pools.

Now let’s say we need to stop the workers if there’s an error while processing any request and also we need to capture the response from the request we made.

Workerpool with receiver channel and context

package main

import (
   "context"
   "errors"
   "fmt"
   "time"
)

func worker(ctx context.Context, cancel context.CancelFunc, thread int, jobs <-chan int, res chan<- map[int]string, errs chan<- error) {
   for d := range jobs {

       select {
       case <-ctx.Done():
           fmt.Printf("Context cancel called thread: %d stopped \n", thread)
           return
       default:
           fmt.Printf("Thread %d \n", thread)
           rs, err := processData(d)
           if err != nil {
               fmt.Printf("\n Error received thread: %d data %d stopped \n", thread, d)
               errs <- err
               cancel()
               return
           }
           res <- rs
       }
   }
}

func main() {
   fmt.Printf("%s Start Program\n", getCurrentTime())
   start := time.Now()
   var data = []int{2000, 1000, 5000, 4000, 200, 800, 300, 400, 601, 606, 666}

   numberOfThreads := 4

   ctx, cancel := context.WithCancel(context.Background())
   defer cancel()

   var jobs = make(chan int, len(data))
   var errs = make(chan error)

   var res = make(chan map[int]string, len(data))

   for i := 0; i < numberOfThreads; i++ {
       go worker(ctx, cancel, i, jobs, res, errs)
   }

   for _, d := range data {
       jobs <- d
   }
   close(jobs)
loop:
   for j := 0; j < len(data); j++ {

       select {
       case err := <-errs:
           fmt.Printf("error received %v", err)
           break loop
       case d := <-res:
           fmt.Printf("Response received %+v", d)
       }
   }

   elap := time.Since(start)
   fmt.Printf("\n\n%s End Program, Took: %s \n", getCurrentTime(), elap)
}

func processData(inp int) (res map[int]string, err error) {
   fmt.Printf("\n %s Start processing for input %d \n", getCurrentTime(), inp)
   start := time.Now()
   time.Sleep(time.Duration(float32(inp)) * time.Millisecond)
   elap := time.Since(start)

   fmt.Printf("\n %s End processing for input %d, Took: %s \n", getCurrentTime(), inp, elap)
   res = map[int]string{inp: getCurrentTime()}
   return
}

func processEvenData(inp int) (res map[int]string, err error) {
   fmt.Printf("%s Start processing for input %d \n", getCurrentTime(), inp)
   start := time.Now()
   time.Sleep(time.Duration(float32(inp)) * time.Millisecond)
   elap := time.Since(start)

   if inp%2 == 0 {
       res = map[int]string{inp: getCurrentTime()}
   } else {
       err = errors.New("failed to process data")
   }
   fmt.Printf("%s End processing for input %d, Took: %s \n", getCurrentTime(), inp, elap)
   return
}

func getCurrentTime() string {
   return time.Now().UTC().Format("2006-01-02T15:04:05Z")
}

Let’s look at the breakdown

var res = make(chan map[int]string, len(data))

This channel will be used to write the response.

 ctx, cancel := context.WithCancel(context.Background())

Context with cancel function allows us to stop the work, we will use it to check if the stop is called from any worker, and if it’s called we stop that thread from processing further requests.

 var errs = make(chan error)

This is an error channel that will be used to write error if any occurs.

select {
  case <-ctx.Done():
}

This will check the context if there’s a cancel signal sent, if canceled it will stop the worker.

if err != nil {
  cancel()
}

If there’s an error we call the cancel so that other workers will stop processing requests.

There are two different functions processData(inp int) that will not send any error. Let’s check the response for it.

2021-07-16T05:50:13Z Start Program
Thread 1 

 2021-07-16T05:50:13Z Start processing for input 2000 
Thread 0 

 2021-07-16T05:50:13Z Start processing for input 1000 

 2021-07-16T05:50:14Z End processing for input 1000, Took: 1.003946012s 
Thread 0 

 2021-07-16T05:50:14Z Start processing for input 5000 
Response received map[1000:2021-07-16T05:50:14Z]
 2021-07-16T05:50:15Z End processing for input 2000, Took: 2.003974342s 
Thread 1 

 2021-07-16T05:50:15Z Start processing for input 4001 
Response received map[2000:2021-07-16T05:50:15Z]
 2021-07-16T05:50:19Z End processing for input 5000, Took: 5.003718612s 
Thread 0 

 2021-07-16T05:50:19Z Start processing for input 200 

 2021-07-16T05:50:19Z End processing for input 4001, Took: 4.003734684s 
Response received map[5000:2021-07-16T05:50:19Z]Thread 1 
Response received map[4001:2021-07-16T05:50:19Z]
 2021-07-16T05:50:19Z Start processing for input 201 

 2021-07-16T05:50:20Z End processing for input 200, Took: 200.475834ms 
Thread 0 

 2021-07-16T05:50:20Z Start processing for input 300 
Response received map[200:2021-07-16T05:50:20Z]
 2021-07-16T05:50:20Z End processing for input 201, Took: 201.073946ms 
Thread 1 

 2021-07-16T05:50:20Z Start processing for input 400 
Response received map[201:2021-07-16T05:50:20Z]
 2021-07-16T05:50:20Z End processing for input 300, Took: 300.611346ms 
Response received map[300:2021-07-16T05:50:20Z]
 2021-07-16T05:50:20Z End processing for input 400, Took: 401.238171ms 
Response received map[400:2021-07-16T05:50:20Z]

2021-07-16T05:50:20Z End Program, Took: 6.610424068s

Here all the records are processed with out error. Now let’s use the other function processEvenData which sends an error if the input is not an even number.

021-07-16T05:56:39Z Start Program
Thread 1 
2021-07-16T05:56:39Z Start processing for input 2000 
Thread 0 
2021-07-16T05:56:39Z Start processing for input 1000 
2021-07-16T05:56:40Z End processing for input 1000, Took: 1.004341994s 
Thread 0 
2021-07-16T05:56:40Z Start processing for input 5000 
Response received map[1000:2021-07-16T05:56:40Z]2021-07-16T05:56:41Z End processing for input 2000, Took: 2.002803714s 
Thread 1 
2021-07-16T05:56:41Z Start processing for input 4001 
Response received map[2000:2021-07-16T05:56:41Z]2021-07-16T05:56:45Z End processing for input 5000, Took: 5.000998866s 
Thread 0 
2021-07-16T05:56:45Z Start processing for input 200 
Response received map[5000:2021-07-16T05:56:45Z]2021-07-16T05:56:45Z End processing for input 4001, Took: 4.002608371s 

 Error received thread: 1 data 4001 stopped 
error received failed to process data

2021-07-16T05:56:45Z End Program, Took: 6.005715129s

Hereafter processing the odd number 4001, other tasks are not processed as it stops other workers as well.

Hope this makes things clear.