Good practice - 2

Goroutines and channels

  • Performance advice:

A rule of thumb if you use parallelism to gain efficiency over serial computation: the amount of work done inside goroutine has to be much higher than the costs associated with creating goroutines and sending data back and forth between them.

Using buffered channels

  • Using buffered channels for performance:

    A buffered channel can easily double its throughput, depending on the context the performance gain can be 10x or more. You can further try to optimize by adjusting the capacity of the channel.

Limiting the number of items in a channel

  • Limiting the number of items in a channel and packing them in arrays: Channels become a bottleneck if you pass a lot of individual items through them. You can work around this by packing chunks of data into arrays and then unpacking on the other end. This can be a speed gain of a factor 10x.

loop over a channel

  • How to loop over a channel ch with a for—range:

    for v := range ch {
        // do something with v
    }
    

Test a channel is closed

  • How to test if a channel ch is closed:

    //read channel until it closes or error-condition
    for {
        if input, open := <-ch; !open {
        break
        }
        fmt.Printf(“%s “, input)
    }
    

Samaphore pattern

  • How to use a channel to let the main program wait until the goroutine completes? (Semaphore pattern)

    
    ch := make(chan int) // Allocate a channel.
    // Start something in a goroutine; when it completes, signal on the channel.
    go func() {
    // doSomething
    ch <- 1 // Send a signal; value does not matter.
    }()
    doSomethingElseForAWhile()
    <-ch // Wait for goroutine to finish; discard sent value.
    // If the routine must block forever, omit ch <- 1 from the lambda function.
    

Channel Factory pattern

  • Channel Factory pattern: the function is a channel factory and starts a lambda function as goroutine populating the channel

    func pump() chan int {
        ch := make(chan int)
        go func() {
            for i := 0; ; i++ {
                ch <- i
            }
        }()
        return ch
    }
    

Channel Iterator pattern

  • Channel Iterator pattern: Implement the Iter() method of a container returns a channel for the calling for-loop to read from.

    func (c *container) Iter() <-chan items {
        ch := make(chan item)
        go func() {
            for i := 0; i < c.Len(); i++ {
                // or use a for-range loop
                ch <- c.items[i]
            }
        }()
        return ch
    }
    
    // The code which calls this method can then iterate over the container
    for x := range container.Iter() { ... }
    

Limiting the number of requests

  • Limiting the number of requests processed concurrently

    const (
        AvailableMemory         = 10 << 20                                  // 10 MB, for example
        AverageMemoryPerRequest = 10 << 10                                  // 10 KB
        MAXREQS                 = AvailableMemory / AverageMemoryPerRequest // here amounts to 1000
    )
    
    var sem = make(chan int, MAXREQS)
    
    type Request struct {
        a, b   int
        replyc chan int
    }
    
    func process(r *Request) {
        // Do something
        // May take a long time and use a lot of memory or CPU
    }
    func handle(r *Request) {
        process(r)
        // signal done: enable next request to start
        // by making 1 empty place in the buffer
        <-sem
    }
    func Server(queue chan *Request) {
        for {
            sem <- 1
            // blocks when channel is full (1000 requests are active)
            // so wait here until there is capacity to process a request
            // (doesn’t matter what we put in it)
            request := <-queue
            go handle(request)
        }
    }
    
    func main() {
        fmt.Println(" AvailableMemory ", AvailableMemory)
        fmt.Println(" AverageMemoryPerRequest ", AverageMemoryPerRequest)
        queue := make(chan *Request)
        go Server(queue)
    }
    

Parallelling computing over a few cores

  • Parallelling a computing over a numbers of CPU cores
const NCPU = 4

func DoAll() {
    sem := make(chan int, NCPU) // Buffering optional but sensible.
    for i := 0; i < NCPU; i++ {
        go DoPart(sem)

    }
    // Drain the channel sem, waiting for NCPU tasks to complete
    for i := 0; i < NCPU; i++ {
        <-sem // wait for one task to complete
    }
    // All done.
}
func DoPart(sem chan int) {
    // do the part of the computation

    sem <- 1 // signal that this piece is done
}

func main() {
    runtime.GOMAXPROCS(NCPU)
    DoAll()
}

Paralleling computing over a large amount of data

func ParallelProcessData (in <- chan *Data, out <- chan *Data) {
    // make channels:
    preOut := make(chan *Data, 100)
    stepAOut := make(chan *Data, 100)
    stepBOut := make(chan *Data, 100)
    stepCOut := make(chan *Data, 100)
    // start parallel computations:
    go PreprocessData(in, preOut)
    go ProcessStepA(preOut, stepAOut)
    go ProcessStepB(stepAOut, stepBOut)
    go ProcessStepC(stepBOut, stepCOut)
    go PostProcessData(stepCOut, out
}

Simple timeout pattern

timeout := make(chan bool, 1)
go func() {
    time.Sleep(1e9) // one second
    timeout <- true
}()
    
select {
    case <-ch:
    // a read from ch has occurred
    case <-timeout:
    // the read from ch has timed out
}

Use in- & out- channel instead of lock

func Worker(in, out chan *Task) {
    for {
        t := <-in
        process(t)
        out <- t
    }
}

Concurrent access to object

type Person struct {
    Name   string
    salary float64
    chF    chan func()
}

func NewPerson(name string, salary float64) *Person {
    p := &Person{name, salary, make(chan func())}
    go p.backend()
    return p
}
func (p *Person) backend() {
    for f := range p.chF {
        f()
    }
}

// Set salary.
func (p *Person) SetSalary(sal float64) {
    p.chF <- func() { p.salary = sal }
}

// Retrieve salary.
func (p *Person) Salary() float64 {
    fChan := make(chan float64)
    p.chF <- func() { fChan <- p.salary }
    return <-fChan
}
func (p *Person) String() string {
    return "Person - name is: " + p.Name + " - salary is: " + strconv.
        FormatFloat(p.Salary(), 'f', 2, 64)
}
func main() {
    bs := NewPerson("Smith Bill", 2500.5)
    fmt.Println(bs)
    bs.SetSalary(4000.25)
    fmt.Println("Salary changed:")
    fmt.Println(bs)
}
/* Output Person - name is: Smith Bill - salary is: 2500.50
Salary changed:
Person - name is: Smith Bill - salary is: 4000.25 *
*/

Abandon synchronous calls

  • Abandon synchronous calls that run too long

    ch := make(chan error, 1)
    go func() { ch <- client.Call(“Service.Method”, args, &reply) } ()
    select {
        case resp := <-ch:
        // use resp and reply
        case <-time.After(timeoutNs):
        // call timed out
        break
    }
    

Benchmarking goroutines

func main() {
    fmt.Println("sync", testing.Benchmark(BenchmarkChannelSync).String())
    fmt.Println("buffered", testing.Benchmark(BenchmarkChannelBuffered).String())
}

func BenchmarkChannelSync(b *testing.B) {
    ch := make(chan int)
    go func() {
        for i := 0; i < b.N; i++ {
            ch <- i
        }
        close(ch)
    }()
    for _ = range ch {
    }
}

func BenchmarkChannelBuffered(b *testing.B) {
    ch := make(chan int, 128)
    go func() {
        for i := 0; i < b.N; i++ {
            ch <- i
        }
        close(ch)
    }()
    for _ = range ch {
    }
}

/* Output:
sync  3000000           420 ns/op
buffered 10000000           103 ns/op
*/

Stopping a goroutine

runtime.Goexit()