CHAPTER 9
One of Go’s most exciting features is its fantastic support for concurrency, which allows programs written in the language to scale massively. In this chapter, we’ll define concurrency and look at the two main features of the language that make concurrent programming easy in Go: goroutines and gochannels.
When a program executes, the operating system creates a process for it to run in. This process acts as a container for the program and all the resources it uses while it executes. These resources include things like the memory the program accesses, any devices it uses, the file handles it uses for I/O, and threads.
A thread is the smallest unit of processing that can be performed in an operating system. A single process can contain multiple threads. Whereas a program that is single-threaded executes each of its instructions sequentially until the last instruction is reached, one with multiple threads allows multiple operations to run seemingly at the same time. For example, consider a program that allows you to watch a video while downloading it.
This delegation of operations to multiple threads of execution is known as concurrency. The dictionary definition of concurrent is “occurring or existing simultaneously,” and concurrency in computer programming means exactly the same thing: the ability of a program to use multiple threads in order to get more done in less time.
A goroutine is a method or function that can run concurrently with other functions. Note that any Go program automatically consists of one goroutine: the program itself.
To create a new goroutine, we need only to prefix the call to a method or function with the go keyword:
go myFunction(parameter)
Because goroutines are extremely lightweight, there is nothing to stop us from creating hundreds or even thousands of them. Each executes and immediately returns control to the next line in the program without the program waiting for whatever function was called in the goroutine to complete.
Consider the following example:
Code Listing 39
package main import ( "fmt" "time" ) func message(s string) { for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func main() { go message("goroutine") message("normal") } |
This program has a function called message() that accepts a string value and prints it out five times. The message() function is called once as a goroutine running in its own thread, and once as normal within the main program thread.
When you run the program, you can see the two different threads competing to display their results:
goroutine
normal
goroutine
normal
goroutine
normal
normal
goroutine
goroutine
normal
The order in which the threads are processed is indeterminate, so when you run the program again, you might get a completely different result:
normal
goroutine
goroutine
normal
normal
goroutine
goroutine
normal
normal
goroutine
Note the use of the Go standard library time package to introduce a delay in the message() function. Without this delay, the program would call the goroutine and immediately drop down into the standard function call, and the program would terminate before the goroutine had a chance to finish.
Code Listing 40
package main import ( "fmt" ) func message(s string) { for i := 0; i < 5; i++ { fmt.Println(s) } } func main() { go message("goroutine") message("normal") } |
normal
normal
normal
normal
normal
Gochannels are the plumbing that allow different goroutines to communicate with each other. You can use a channel to send a value from one goroutine and receive it in another.
You must create a channel before you attempt to use it. You create a channel with make() like you do with slices and maps. The syntax to create a channel is as follows:
myChannel := make(chan type)
A channel can only accept values of any one type, so you need to specify the type when creating it.
You can put a value into the channel with the following syntax:
myChannel <- value
You retrieve a value from the channel like so:
myVariable: <- myChannel
By default, sends and receives are blocked until the sender and receiver are both ready. This allows goroutines to synchronize without explicit locks or condition variables.
Let’s illustrate the use of channels with a programmatic implementation of a “numbers station.”
A numbers station is a peculiar artifact of the Cold War. During this period, shortwave radio enthusiasts started to notice bizarre radio broadcasts. They would start with a weird melody or a series of beeps, and then a strange woman’s or a child’s voice would start announcing a series of seemingly random numbers, presumably representing some unknown code to communicate with agents in the field.
Weird, and not a little creepy.
Anyway, we’ll create a function that generates a series of these numbers and drops them into a channel, which our program can then access.
Code Listing 41
package main import ( "fmt" "math/rand" "time" ) func broadcast(c chan int) { // infinite loop to create random numbers for { /* generate a random number 0-999 and put it into the channel */ c <- rand.Intn(999) } } func main() { numbersStation := make(chan int) // execute broadcast in a separate thread go broadcast(numbersStation) // retrieve values from the channel for num := range numbersStation { // delay for artistic effect only time.Sleep(1000 * time.Millisecond) fmt.Printf("%d ", num) } } |
878 636 407 983 895 735 520 998 904 150 212 538 750 362 436 215 630 506 20 914 272 207 266 298 135 565 43 964 942 705 562 249 734 203 840 152 357 718 84 189 871 256 ...
Let’s break that down.
The broadcast() function accepts an integer channel c as a parameter. It then creates an infinite number of random integers between zero and 999 by using another Go package we haven’t seen yet: math/rnd, and drops those into the channel.
In main(), we’re creating a new integer channel called numbersStation. We then call broadcast() as a goroutine in a separate thread, which then continually generates random numbers and drops them into the channel.
Our for loop accesses the channel and displays each of the random numbers generated by the broadcast one at a time, with a one-second delay between each. This all happens so quickly that if we didn’t have a delay, the output would just be a blur of numbers—far too many for our number station operator to read out.
Let’s create a slightly more useful example—one that creates new account numbers for a fictitious banking application. The account number will be eight digits, starting from 30000001.
Code Listing 42
package main import ( "fmt" ) func generateAccountNumber(accountNumberChannel chan int) { // internal variable to store last generated account number var accountNumber int accountNumber = 30000001 for { accountNumberChannel <- accountNumber // increment the account number by 1 accountNumber += 1 } } func main() { accountNumberChannel := make(chan int) // start the goroutine that generates account numbers go generateAccountNumber(accountNumberChannel) fmt.Printf("SMITH: %d\n", <-accountNumberChannel) fmt.Printf("SINGH: %d\n", <-accountNumberChannel) fmt.Printf("JONES: %d\n", <-accountNumberChannel) fmt.Printf("LOPEZ: %d\n", <-accountNumberChannel) fmt.Printf("CLARK: %d\n", <-accountNumberChannel) } |
SMITH: 30000001
SINGH: 30000002
JONES: 30000003
LOPEZ: 30000004
CLARK: 30000005
If we modify the code in the previous listing so that we close the channel after five account numbers have been requested, then any subsequent read of the channel will return nil. We can check for this eventuality using the “comma OK” syntax.
Code Listing 43
package main import ( "fmt" ) func generateAccountNumber(accountNumberChannel chan int) { // internal variable to store last generated account number var accountNumber int accountNumber = 30000001 for { // close the channel after 5 numbers are requested if accountNumber > 30000005 { close(accountNumberChannel) return } accountNumberChannel <- accountNumber // increment the account number by 1 accountNumber += 1 } } func main() { accountNumberChannel := make(chan int) // start the goroutine that generates account numbers go generateAccountNumber(accountNumberChannel) // slice containing new customer names newCustomers := []string{"SMITH", "SINGH", "JONES", "LOPEZ", "CLARK", "ALLEN"} // get a new account number for each customer for _, newCustomer := range newCustomers { // is there anything to retrieve from the channel? accnum, ok := <-accountNumberChannel if !ok { fmt.Printf("%s: No number available\n", newCustomer) } else { fmt.Printf("%s: %d\n", newCustomer, accnum) } } } |
SMITH: 30000001
SINGH: 30000002
JONES: 30000003
LOPEZ: 30000004
CLARK: 30000005
ALLEN: No number available
In the examples in Code Listings 42 and 43, the accountNumberChannel can be accessed anywhere in the program to generate the next account number in the sequence. The generateAccountNumber() function will not write a new account number to the channel until the existing value has been read by the program. This is because, by default, channels are un-buffered and are therefore synchronous in execution.
Compare the examples in Code Listings 42 and 43 to that in Code Listing 41. In Code Listing 41, there was no attempt to read a value from the channel (using the myVariable := <- myChannel syntax). No blocking occurs, and therefore the program continues to generate random numbers and add them to the channel.
If we want, we can buffer the channel, allowing for asynchronous execution. We do this by specifying a capacity for the channel as a second parameter:
myChannel := make(chan type, capacity)
Let’s compare the operation of unbuffered and buffered channels. First, the unbuffered channel:
Code Listing 44
package main import ( "fmt" ) func main() { // un-buffered channel c := make(chan int) c <- 3 fmt.Println("OK.") } |
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
.../src/hello/main.go:10 +0x60
exit status 2
The program terminates with an error because the channel is unbuffered, and we’re attempting to put a value into the channel when there is nothing in the program to read from it.
If we create a buffered channel with a capacity of 1, we can add a single value to it without any issues:
Code Listing 45
package main import ( "fmt" ) func main() { // buffered channel, capacity 1 c := make(chan int, 1) c <- 3 fmt.Println("OK.") } |
OK.
Writing to a buffered channel does not block if there is sufficient capacity in the channel. If we attempt to put more values in the channel than we have capacity for, we will get the same error as we saw previously:
Code Listing 46
package main import ( "fmt" ) func main() { // buffered channel, capacity 3 c := make(chan int, 3) c <- 3 c <- 4 c <- 5 c <- 6 fmt.Println("OK.") } |
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
.../src/hello/main.go:13 +0xf1
exit status 2
It is unlikely that a real-life Go program will rely upon a single channel. More typically, there will be multiple channels, and your program will try to coordinate reads and writes across all those channels.
Consider this alternative implementation of our numbers station program. The program listens on two channels: one that generates a sequence of numbers, and the other that waits until it receives a message telling it to stop transmitting. The broadcast() function uses a select statement to determine if anything is listening on the nsChannel (in which case it generates the next number in the sequence) or if it has received a message on the cChannel telling it to terminate (in which case it terminates the transmission).
select {
case nsChannel <- numbers[i]:
i += 1
if i == len(numbers) {
i = 0
}
case <-cChannel:
cChannel <- true
return
}
I have commented liberally throughout this program. See if you can work out what’s going on.
Code Listing 47
package main import ( "fmt" "time" ) func broadcast(nsChannel chan int, cChannel chan bool) { numbers := []int{ 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, } i := 0 for { // see which channel has items select { /* if the numbersChannel is being listened to, take each number sequentially from the slice and put it into the channel */ case nsChannel <- numbers[i]: i += 1 /* if we've reached the last number and the channel is still being listened to, start reading from the beginning of the slice again */ if i == len(numbers) { i = 0 } /* if we receive a message on the complete channel, we stop transmitting */ case <-cChannel: cChannel <- true return } } } func main() { numbersStation := make(chan int) completeChannel := make(chan bool) // execute broadcast in a separate thread go broadcast(numbersStation, completeChannel) // get 100 numbers from the numbersStation channel for i := 0; i < 100; i++ { // delay for artistic effect only time.Sleep(100 * time.Millisecond) // retrieve values from the channel fmt.Printf("%d ", <-numbersStation) } /* once we have received 100 numbers, send a message on completeChannel to tell it to stop transmitting */ completeChannel <- true /* don't terminate the program until we receive a message on the completeChannel. Discard the response. */ <-completeChannel /* we only get to here if we received a message on completeChannel */ fmt.Println("Transmission Complete.") } |
101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 101 102 103 104 105 106 107 108 109 110 Transmission Complete.
Here is another example of using goroutines and channels, but this one brings together a number of concepts that we have covered in this book. Some of the code here is not immediately obvious, so take the time to work through it, as doing so will help you to fully assimilate some of the features of Go you have learned so far.
Examine the following code and then read the explanation that follows.
Code Listing 48
package main import ( "fmt" "io/ioutil" "log" "net/http" "sort" ) type WebPage struct { URL string Size int } type WebPages []WebPage // implementing the sort.Interface interface in WebPages func (slice WebPages) Len() int { return len(slice) } func (slice WebPages) Less(i, j int) bool { // Sort of size of response in descending order return slice[i].Size > slice[j].Size } func (slice WebPages) Swap(i, j int) { slice[i], slice[j] = slice[j], slice[i] } // method for adding a new WebPage element to WebPages func (wp *WebPages) addElement(page WebPage) { *wp = append(*wp, page) } // called as a goroutine to retrieve the length of each webpage func getWebPageLength(url string, resultsChannel chan WebPage) { res, err := http.Get(url) if err != nil { log.Fatal(err) } defer res.Body.Close() // get the size of the response body size, err := ioutil.ReadAll(res.Body) if err != nil { log.Fatal(err) } // populate the WebPage struct and add it to the channel var page WebPage page.URL = url page.Size = len(size) resultsChannel <- page } func main() { urls := []string{ "http://www.syncfusion.com", "http://www.google.com", "http://www.microsoft.com", "http://www.apple.com", "http://www.golang.org", } // create a channel resultsChannel := make(chan WebPage) // call a goroutine to read each webpage simultaneously for _, url := range urls { /* initiate a new goroutine for each URL so that we can analyze them concurrently */ go getWebPageLength(url, resultsChannel) } // store each WebPage result in WebPages results := new(WebPages) for range urls { result := <-resultsChannel results.addElement(result) } // sort using the implementation of sort.Interface in WebPages sort.Sort(results) // display the results to the user for i, page := range *results { fmt.Printf("%d. %s: %d bytes.\n", i+1, page.URL, page.Size) } } |
1. http://www.syncfusion.com: 108794 bytes.
2. http://www.microsoft.com: 79331 bytes.
3. http://www.apple.com: 31204 bytes.
4. http://www.google.com: 19850 bytes.
5. http://www.golang.org: 7856 bytes.
The program examines the length of five webpages and lists them in order of largest to smallest. It executes a goroutine for each webpage so that we don’t have to wait to download one webpage before we start on the next. The goroutine pushes the result of each analysis onto a channel, resultsChannel.
First, let’s see what’s new. To get the length of the webpages, we’re using Go’s net/http package. To retrieve the webpage, we call http.Get(), passing in the URL of the page we’re interested in, and we examine the body of the response. If anything goes wrong, we use Go’s log package to report a fatal error condition.
Once we’re done with the response body, we must close it, and here we’re using a defer to res.Body.close(). The defer statement pushes a function call onto a list that Go maintains internally. The function deferred only gets called when the surrounding function completes. You often see defer used to simplify functions that do some sort of cleanup.
res, err := http.Get(url)
if err != nil {
log.Fatal(err)
}
defer res.Body.Close()
With the body of the response, we can get its length in bytes by reading it into memory with the ioutil package’s ReadAll() method, which returns the number of bytes read:
// get the size of the response body
size, err := ioutil.ReadAll(res.Body)
if err != nil {
log.Fatal(err)
}
Everything else in the program builds on what we have learned so far.
In order to store the results for later display to the user, we are creating a couple of custom types. One is called WebPage and contains the URL and the length (in bytes) of the response. The other is called WebPages and is a slice that contains five instances of WebPage, one for each webpage.
Because we want to order the results by size, and the size information is a field buried within our WebPage struct, we’ve implemented the sort.Interface interface in the WebPages type. To do that, we have included the following methods in WebPages:
func (slice WebPages) Len() int {
return len(slice)
}
func (slice WebPages) Less(i, j int) bool {
return slice[i].Size > slice[j].Size;
}
func (slice WebPages) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}
Because we have defined our own type WebPages, we have lost the ability to append() items to it, which we would keep if we were using a normal slice. So we have created a method on WebPages that allows us to add a WebPage to the collection when we retrieve it from the resultsChannel. Because we are directly changing the contents of WebPages, we must use a pointer to the receiver in our method declaration so that we are not simply working on a copy of of it.
func (wp *WebPages) addElement(page WebPage) {
*wp = append(*wp, page)
}
Once we have populated WebPages with the results of reading the responses from all five websites, we call sort.Sort() to retrieve each of the results in size order from largest to smallest and display them to the user:
sort.Sort(results)
for i, page := range *results {
fmt.Printf("%d. %s: %d bytes.\n", i, page.URL, page.Size)
}