left-icon

Go Succinctly®
by Mark Lewin

Previous
Chapter

of
A
A
A

CHAPTER 9

Concurrency

Concurrency


One of Go’s most exciting features is its fantastic support for concurrency, which allows programs written in the language to scale massively. In this chapter, we’ll define concurrency and look at the two main features of the language that make concurrent programming easy in Go: goroutines and gochannels.

Processes, threads, and concurrency

When a program executes, the operating system creates a process for it to run in. This process acts as a container for the program and all the resources it uses while it executes. These resources include things like the memory the program accesses, any devices it uses, the file handles it uses for I/O, and threads.

A thread is the smallest unit of processing that can be performed in an operating system. A single process can contain multiple threads. Whereas a program that is single-threaded executes each of its instructions sequentially until the last instruction is reached, one with multiple threads allows multiple operations to run seemingly at the same time. For example, consider a program that allows you to watch a video while downloading it.

This delegation of operations to multiple threads of execution is known as concurrency. The dictionary definition of concurrent is “occurring or existing simultaneously,” and concurrency in computer programming means exactly the same thing: the ability of a program to use multiple threads in order to get more done in less time.

Goroutines

A goroutine is a method or function that can run concurrently with other functions. Note that any Go program automatically consists of one goroutine: the program itself.

To create a new goroutine, we need only to prefix the call to a method or function with the go keyword:

go myFunction(parameter)

Because goroutines are extremely lightweight, there is nothing to stop us from creating hundreds or even thousands of them. Each executes and immediately returns control to the next line in the program without the program waiting for whatever function was called in the goroutine to complete.

Consider the following example:

Code Listing 39

package main

import (

     "fmt"

     "time"

)

func message(s string) {

     for i := 0; i < 5; i++ {

          time.Sleep(100 * time.Millisecond)

          fmt.Println(s)

     }

}

func main() {

     go message("goroutine")

     message("normal")

}

This program has a function called message() that accepts a string value and prints it out five times. The message() function is called once as a goroutine running in its own thread, and once as normal within the main program thread.

When you run the program, you can see the two different threads competing to display their results:

goroutine

normal

goroutine

normal

goroutine

normal

normal

goroutine

goroutine

normal


The order in which the threads are processed is indeterminate, so when you run the program again, you might get a completely different result:

normal

goroutine

goroutine

normal

normal

goroutine

goroutine

normal

normal

goroutine

Note the use of the Go standard library time package to introduce a delay in the message() function. Without this delay, the program would call the goroutine and immediately drop down into the standard function call, and the program would terminate before the goroutine had a chance to finish.

Code Listing 40

package main

import (

     "fmt"

)

func message(s string) {

     for i := 0; i < 5; i++ {

          fmt.Println(s)

     }

}

func main() {

     go message("goroutine")

     message("normal")

}

normal

normal

normal

normal

normal

Gochannels

Gochannels are the plumbing that allow different goroutines to communicate with each other. You can use a channel to send a value from one goroutine and receive it in another.

You must create a channel before you attempt to use it. You create a channel with make() like you do with slices and maps. The syntax to create a channel is as follows:

myChannel := make(chan type)

A channel can only accept values of any one type, so you need to specify the type when creating it.

You can put a value into the channel with the following syntax:

myChannel <- value

You retrieve a value from the channel like so:

myVariable: <- myChannel

By default, sends and receives are blocked until the sender and receiver are both ready. This allows goroutines to synchronize without explicit locks or condition variables.

Let’s illustrate the use of channels with a programmatic implementation of a “numbers station.”

A numbers station is a peculiar artifact of the Cold War. During this period, shortwave radio enthusiasts started to notice bizarre radio broadcasts. They would start with a weird melody or a series of beeps, and then a strange woman’s or a child’s voice would start announcing a series of seemingly random numbers, presumably representing some unknown code to communicate with agents in the field.

Weird, and not a little creepy.

Anyway, we’ll create a function that generates a series of these numbers and drops them into a channel, which our program can then access.

Code Listing 41

package main

import (

     "fmt"

     "math/rand"

     "time"

)

func broadcast(c chan int) {

     // infinite loop to create random numbers

     for {

          /* generate a random number 0-999

          and put it into the channel */

          c <- rand.Intn(999)

     }

}

func main() {

     numbersStation := make(chan int)

     // execute broadcast in a separate thread

     go broadcast(numbersStation)

     // retrieve values from the channel

     for num := range numbersStation {

          // delay for artistic effect only

          time.Sleep(1000 * time.Millisecond)

          fmt.Printf("%d ", num)

     }

}

878 636 407 983 895 735 520 998 904 150 212 538 750 362 436 215 630 506 20 914 272 207 266 298 135 565 43 964 942 705 562 249 734 203 840 152 357 718 84 189 871 256 ...

Let’s break that down.

The broadcast() function accepts an integer channel c as a parameter. It then creates an infinite number of random integers between zero and 999 by using another Go package we haven’t seen yet: math/rnd, and drops those into the channel.

In main(), we’re creating a new integer channel called numbersStation. We then call broadcast() as a goroutine in a separate thread, which then continually generates random numbers and drops them into the channel.

Our for loop accesses the channel and displays each of the random numbers generated by the broadcast one at a time, with a one-second delay between each. This all happens so quickly that if we didn’t have a delay, the output would just be a blur of numbers—far too many for our number station operator to read out.

Let’s create a slightly more useful example—one that creates new account numbers for a fictitious banking application. The account number will be eight digits, starting from 30000001.

Code Listing 42

package main

import (

     "fmt"

)

func generateAccountNumber(accountNumberChannel chan int) {

     // internal variable to store last generated account number

     var accountNumber int

     accountNumber = 30000001

     for {

          accountNumberChannel <- accountNumber

          // increment the account number by 1

          accountNumber += 1

     }

}

func main() {

     accountNumberChannel := make(chan int)

     // start the goroutine that generates account numbers

     go generateAccountNumber(accountNumberChannel)

     fmt.Printf("SMITH: %d\n", <-accountNumberChannel)

     fmt.Printf("SINGH: %d\n", <-accountNumberChannel)

     fmt.Printf("JONES: %d\n", <-accountNumberChannel)

     fmt.Printf("LOPEZ: %d\n", <-accountNumberChannel)

     fmt.Printf("CLARK: %d\n", <-accountNumberChannel)

}

SMITH: 30000001

SINGH: 30000002

JONES: 30000003

LOPEZ: 30000004

CLARK: 30000005

If we modify the code in the previous listing so that we close the channel after five account numbers have been requested, then any subsequent read of the channel will return nil. We can check for this eventuality using the “comma OK” syntax.

Code Listing 43

package main

import (

     "fmt"

)

func generateAccountNumber(accountNumberChannel chan int) {

     // internal variable to store last generated account number

     var accountNumber int

     accountNumber = 30000001

     for {

          // close the channel after 5 numbers are requested

          if accountNumber > 30000005 {

               close(accountNumberChannel)

               return

          }

          accountNumberChannel <- accountNumber

          // increment the account number by 1

          accountNumber += 1

     }

}

func main() {

     accountNumberChannel := make(chan int)

     // start the goroutine that generates account numbers

     go generateAccountNumber(accountNumberChannel)

     // slice containing new customer names

     newCustomers := []string{"SMITH", "SINGH", "JONES", "LOPEZ",

                                                 "CLARK", "ALLEN"}

     // get a new account number for each customer

     for _, newCustomer := range newCustomers {

          // is there anything to retrieve from the channel?

          accnum, ok := <-accountNumberChannel

          if !ok {

               fmt.Printf("%s: No number available\n",

                                                      newCustomer)

          } else {

               fmt.Printf("%s: %d\n", newCustomer, accnum)

          }

     }

}

SMITH: 30000001

SINGH: 30000002

JONES: 30000003

LOPEZ: 30000004

CLARK: 30000005

ALLEN: No number available

Buffered channels

In the examples in Code Listings 42 and 43, the accountNumberChannel can be accessed anywhere in the program to generate the next account number in the sequence. The generateAccountNumber() function will not write a new account number to the channel until the existing value has been read by the program. This is because, by default, channels are un-buffered and are therefore synchronous in execution.

Compare the examples in Code Listings 42 and 43 to that in Code Listing 41. In Code Listing 41, there was no attempt to read a value from the channel (using the myVariable := <- myChannel syntax). No blocking occurs, and therefore the program continues to generate random numbers and add them to the channel.

If we want, we can buffer the channel, allowing for asynchronous execution. We do this by specifying a capacity for the channel as a second parameter:

myChannel := make(chan type, capacity)

Let’s compare the operation of unbuffered and buffered channels. First, the unbuffered channel:

Code Listing 44

package main

import (

     "fmt"

)

func main() {

     // un-buffered channel

     c := make(chan int)

     c <- 3

     fmt.Println("OK.")

}

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:

main.main()

     .../src/hello/main.go:10 +0x60

exit status 2

The program terminates with an error because the channel is unbuffered, and we’re attempting to put a value into the channel when there is nothing in the program to read from it.

If we create a buffered channel with a capacity of 1, we can add a single value to it without any issues:

Code Listing 45

package main

import (

     "fmt"

)

func main() {

     // buffered channel, capacity 1

     c := make(chan int, 1)

     c <- 3

     fmt.Println("OK.")

}

OK.

Writing to a buffered channel does not block if there is sufficient capacity in the channel. If we attempt to put more values in the channel than we have capacity for, we will get the same error as we saw previously:

Code Listing 46

package main

import (

     "fmt"

)

func main() {

     // buffered channel, capacity 3

     c := make(chan int, 3)

     c <- 3

     c <- 4

     c <- 5

     c <- 6

     fmt.Println("OK.")

}

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:

main.main()

     .../src/hello/main.go:13 +0xf1

exit status 2

Communicating on multiple channels

It is unlikely that a real-life Go program will rely upon a single channel. More typically, there will be multiple channels, and your program will try to coordinate reads and writes across all those channels.

Consider this alternative implementation of our numbers station program. The program listens on two channels: one that generates a sequence of numbers, and the other that waits until it receives a message telling it to stop transmitting. The broadcast() function uses a select statement to determine if anything is listening on the nsChannel (in which case it generates the next number in the sequence) or if it has received a message on the cChannel telling it to terminate (in which case it terminates the transmission).

select {

case nsChannel <- numbers[i]:

     i += 1

     if i == len(numbers) {

          i = 0

     }

case <-cChannel:

     cChannel <- true

     return

}

I have commented liberally throughout this program. See if you can work out what’s going on.

Code Listing 47

package main

import (

     "fmt"

     "time"

)

func broadcast(nsChannel chan int, cChannel chan bool) {

     numbers := []int{

          101,

          102,

          103,

          104,

          105,

          106,

          107,

          108,

          109,

          110,

     }

     i := 0

     for {

          // see which channel has items

          select {

          /* if the numbersChannel is being listened to,

           take each number sequentially from the

          slice and put it into the channel */

          case nsChannel <- numbers[i]:

               i += 1

               /* if we've reached the last number and

               the channel is still being listened to,

               start reading from the beginning of the

               slice again */

               if i == len(numbers) {

                    i = 0

               }

          /* if we receive a message on the

          complete channel, we stop transmitting */

          case <-cChannel:

               cChannel <- true

               return

          }

     }

}

func main() {

     numbersStation := make(chan int)

     completeChannel := make(chan bool)

     // execute broadcast in a separate thread

     go broadcast(numbersStation, completeChannel)

     // get 100 numbers from the numbersStation channel

     for i := 0; i < 100; i++ {

          // delay for artistic effect only

          time.Sleep(100 * time.Millisecond)

          // retrieve values from the channel

          fmt.Printf("%d ", <-numbersStation)

     }

     /* once we have received 100 numbers,

     send a message on completeChannel

     to tell it to stop transmitting */

     completeChannel <- true

     /* don't terminate the program until

     we receive a message on the completeChannel.

     Discard the response. */

     <-completeChannel

     /* we only get to here if we received a

     message on completeChannel */

     fmt.Println("Transmission Complete.")

}

101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 Transmission Complete.

Bringing it all together

Here is another example of using goroutines and channels, but this one brings together a number of concepts that we have covered in this book. Some of the code here is not immediately obvious, so take the time to work through it, as doing so will help you to fully assimilate some of the features of Go you have learned so far.

Examine the following code and then read the explanation that follows.

Code Listing 48

package main

import (

     "fmt"

     "io/ioutil"

     "log"

     "net/http"

     "sort"

)

type WebPage struct {

     URL  string

     Size int

}

type WebPages []WebPage

// implementing the sort.Interface interface in WebPages

func (slice WebPages) Len() int {

     return len(slice)

}

func (slice WebPages) Less(i, j int) bool {

     // Sort of size of response in descending order

     return slice[i].Size > slice[j].Size

}

func (slice WebPages) Swap(i, j int) {

     slice[i], slice[j] = slice[j], slice[i]

}

// method for adding a new WebPage element to WebPages

func (wp *WebPages) addElement(page WebPage) {

     *wp = append(*wp, page)

}

// called as a goroutine to retrieve the length of each webpage

func getWebPageLength(url string, resultsChannel chan WebPage) {

     res, err := http.Get(url)

     if err != nil {

          log.Fatal(err)

     }

     defer res.Body.Close()

     // get the size of the response body

     size, err := ioutil.ReadAll(res.Body)

     if err != nil {

          log.Fatal(err)

     }

     // populate the WebPage struct and add it to the channel

     var page WebPage

     page.URL = url

     page.Size = len(size)

     resultsChannel <- page

}

func main() {

     urls := []string{

          "http://www.syncfusion.com",

          "http://www.google.com",

          "http://www.microsoft.com",

          "http://www.apple.com",

          "http://www.golang.org",

     }

     // create a channel

     resultsChannel := make(chan WebPage)

     // call a goroutine to read each webpage simultaneously

     for _, url := range urls {

          /* initiate a new goroutine for each URL

          so that we can analyze them concurrently */

          go getWebPageLength(url, resultsChannel)

     }

     // store each WebPage result in WebPages

     results := new(WebPages)

     for range urls {

          result := <-resultsChannel

          results.addElement(result)

     }

     // sort using the implementation of sort.Interface in WebPages

     sort.Sort(results)

     // display the results to the user

     for i, page := range *results {

          fmt.Printf("%d. %s: %d bytes.\n", i+1, page.URL,

                                                     page.Size)

     }

}

1. http://www.syncfusion.com: 108794 bytes.

2. http://www.microsoft.com: 79331 bytes.

3. http://www.apple.com: 31204 bytes.

4. http://www.google.com: 19850 bytes.

5. http://www.golang.org: 7856 bytes.

The program examines the length of five webpages and lists them in order of largest to smallest. It executes a goroutine for each webpage so that we don’t have to wait to download one webpage before we start on the next. The goroutine pushes the result of each analysis onto a channel, resultsChannel.

First, let’s see what’s new. To get the length of the webpages, we’re using Go’s net/http package. To retrieve the webpage, we call http.Get(), passing in the URL of the page we’re interested in, and we examine the body of the response. If anything goes wrong, we use Go’s log package to report a fatal error condition.

Once we’re done with the response body, we must close it, and here we’re using a defer to res.Body.close(). The defer statement pushes a function call onto a list that Go maintains internally. The function deferred only gets called when the surrounding function completes. You often see defer used to simplify functions that do some sort of cleanup.

     res, err := http.Get(url)

     if err != nil {

          log.Fatal(err)

     }

     defer res.Body.Close()

With the body of the response, we can get its length in bytes by reading it into memory with the ioutil package’s ReadAll() method, which returns the number of bytes read:

     // get the size of the response body

     size, err := ioutil.ReadAll(res.Body)

     if err != nil {

          log.Fatal(err)

     }

Everything else in the program builds on what we have learned so far.

In order to store the results for later display to the user, we are creating a couple of custom types. One is called WebPage and contains the URL and the length (in bytes) of the response. The other is called WebPages and is a slice that contains five instances of WebPage, one for each webpage.

Because we want to order the results by size, and the size information is a field buried within our WebPage struct, we’ve implemented the sort.Interface interface in the WebPages type. To do that, we have included the following methods in WebPages:

func (slice WebPages) Len() int {

    return len(slice)

}

func (slice WebPages) Less(i, j int) bool {

    return slice[i].Size > slice[j].Size;

}

func (slice WebPages) Swap(i, j int) {

    slice[i], slice[j] = slice[j], slice[i]

}

Because we have defined our own type WebPages, we have lost the ability to append() items to it, which we would keep if we were using a normal slice. So we have created a method on WebPages that allows us to add a WebPage to the collection when we retrieve it from the resultsChannel. Because we are directly changing the contents of WebPages, we must use a pointer to the receiver in our method declaration so that we are not simply working on a copy of of it.

func (wp *WebPages) addElement(page WebPage) {

    *wp = append(*wp, page)

}

Once we have populated WebPages with the results of reading the responses from all five websites, we call sort.Sort() to retrieve each of the results in size order from largest to smallest and display them to the user:

sort.Sort(results)

for i, page := range *results {

     fmt.Printf("%d. %s: %d bytes.\n", i, page.URL, page.Size)

}

Scroll To Top
Disclaimer
DISCLAIMER: Web reader is currently in beta. Please report any issues through our support system. PDF and Kindle format files are also available for download.

Previous

Next



You are one step away from downloading ebooks from the Succinctly® series premier collection!
A confirmation has been sent to your email address. Please check and confirm your email subscription to complete the download.