Goroutine & Channel - 2
Channel directionality
A channel type may be annotated to specify that it may only send or only receive
var send_only chan<- int // channel can only receive data var recv_only <-chan int // channel can only send data
Receive-only channels ( <-chan T ) cannot be closed, because closing a channel is intended as a way for a sender to signal that no more values will be sent to the channel, so it has no meaning for receive-only channels.
Pipe & Filter pattern
A goroutine channel process will processes what it receives from an input channel and sends this to an output channel
func main() { sendChan := make(chan int) receiveChan := make(chan string) go processChannel(sendChan, receiveChan) } func processChannel(in <-chan int, out chan<- string) { for inValue := range in { result := strconv.Itoa(inValue) // processing inValue // ... out <- result } }
Prime number generator sample
func generate() chan int { ch := make(chan int) go func() { for i := 2; ; i++ { ch <- i } }() return ch } // Filter out input values divisible by prime, send rest to returned channel func filter(in chan int, prime int) chan int { out := make(chan int) go func() { for { if i := <-in; i%prime != 0 { out <- i } } }() return out } func sieve() chan int { out := make(chan int) go func() { ch := generate() for { prime := <-ch ch = filter(ch, prime) out <- prime } }() return out } func main() { primes := sieve() for { fmt.Println(<-primes) } }
Goroutine with select
The select chooses which of the multiple communications listed by its cases can proceed. The default clause is optional; fall through behavior, like in the normal switch, is not permitted.
- It all are blocked, it waits until one can proceed.
- If multiple can proceed, it chooses one at random.
- When none of the channel operations can proceed and the default clause is present, then this is executed: the default is always runnable (that is: ready to execute).
Using a send operation in a select statement with a default case guarantees that the send will be non-blocking! If there are no cases, the select blocks execution forever.
Sample 9
func main() { runtime.GOMAXPROCS(2) // in goroutine_select2.go ch1 := make(chan int) ch2 := make(chan int) go pump1(ch1) go pump2(ch2) go suck(ch1, ch2) time.Sleep(1e9) } func pump1(ch chan int) { for i := 0; ; i++ { ch <- i * 2 } } func pump2(ch chan int) { for i := 0; ; i++ { ch <- i + 5 } } func suck(ch1 chan int, ch2 chan int) { for { select { case v := <-ch1: fmt.Printf("Received on channel 1: %d\n", v) case v := <-ch2: fmt.Printf("Received on channel 2: %d\n", v) } } }
Timeout & Ticker
Ticker: a struct time.Ticker which is an object that repeatedly sends a time value on a contained channel C at a specified time interval
type Ticker struct { C <-chan Time // the channel on which the ticks are delivered. // contains filtered or unexported fields // ... }
A ticker is stopped with Stop(), use this in a defer statement.
ticker := time.NewTicker(updateInterval) defer ticker.Stop() // ... select { case u:= <- ch1: // ... case v:= <- ch2: // ... case <- ticker.C: logState(status) // call some logging function logState default: // no value ready to be received // ... }
Handy to use when you have to limit the rate of processing per unit time. The time.Tick() function with signature func Tick(d Duration) <-chan Time is useful when you only need access to the return channel and don’t need to shutdown it.
Sample below is good use case for time.Tick function
rate_per_sec := 10 var dur Duration = 1e9 / rate_per_sec chRate := time.Tick(dur) // a tick every 1/10th of a second for req := range requests { <- chRate // rate limit our Service.Method RPC calls go client.Call("Service.Method", req, ...) // client.Call is RPC call }
A Timer type looks exactly the same as a Ticker type (it is constructed with NewTimer(d but it sends the time only once, after a Duration d.
After Duration d the current time is sent on the returned channel; so this is equivalent to NewTimer(d).C; it resembles Tick(), but After() sends the time only once.
Sample of timer
func main() { tick := time.Tick(1e8) boom := time.After(5e8) for { select { case <-tick: fmt.Println("tick.") case <-boom: fmt.Println("BOOM!") return default: fmt.Println(" .") time.Sleep(5e7) } } }
Generator
Lazy generator
A generator is a function that returns the next value in a sequence each time the function is called.
It is a producer that only returns the next value, not the entire sequence; this is called lazy evaluation:only compute what you need at the moment, saving valuable resources (memory and CPU): it is a technology for the evaluation of expressions on demand.
Basic lazy generator
var resume chan int
func integers() chan int {
yield := make(chan int)
count := 0
go func() {
for {
yield <- count
count++
}
}()
return yield
}
func generateInteger() int {
return <-resume
}
func main() {
resume = integers()
fmt.Println(generateInteger()) //=> 0
fmt.Println(generateInteger()) //=> 1
fmt.Println(generateInteger()) //=> 2
}
Generic Lazy Generator
By making clever use of the empty interface, closures and higher order functions we can implement a generic builder BuildLazyEvaluator for the lazy evaluation function (this should best placed inside a utility package). The builder takes a function that has to be evaluated and an initial state as arguments and returns a function without arguments returning the desired value. The passed evaluation function has to calculate the next return value as well as the next state based on the state argument. Inside the builder a channel and a goroutine with an endless loop are created. The return values are passed to the channel from which they are fetched by the returned function for later usage. Each time a value is fetched the next one will be calculated.
type Any interface{} type EvalFunc func(Any) (Any, Any) func main() { evenFunc := func(state Any) (Any, Any) { oldSate := state.(int) newState := oldSate + 2 return oldSate, newState } even := BuildLazyIntEvaluator(evenFunc, 0) for i := 0; i < 10; i++ { fmt.Printf("%vth even: %v\n", i, even()) } } func BuildLazyEvaluator(evalFunc EvalFunc, initState Any) func() Any { retValChan := make(chan Any) loopFunc := func() { var actState Any = initState var retVal Any for { retVal, actState = evalFunc(actState) retValChan <- retVal } } retFunc := func() Any { return <-retValChan } go loopFunc() return retFunc } func BuildLazyIntEvaluator(evalFunc EvalFunc, initState Any) func() int { evalFn := BuildLazyEvaluator(evalFunc, initState) return func() int { return evalFn().(int) } }
Future
A related idea is that of futures: sometimes you know you need to compute a value before you need to actually use the value. In this case, you can potentially start computing the value on another processor and have it ready when you need it.
Futures are easy to implement via closures and goroutines, the idea is similar to generators, except a future needs only to return one value.
Matrix package will look like the code below
// futures used internally type futureMatrix chan Matrix; // API remains the same func Inverse (a Matrix) Matrix { return <-InverseAsync(promise(a)) } func Product (a Matrix, b Matrix) Matrix { return <-ProductAsync(promise(a), promise(b)) } // expose async version of the API func InverseAsync (a futureMatrix) futureMatrix { c := make (futureMatrix) go func () { c <- inverse(<-a) } () return c } func ProductAsync (a futureMatrix) futureMatrix { c := make (futureMatrix) go func () { c <- product(<-a) } () return c } // actual implementation is the same as before func product (a Matrix, b Matrix) Matrix { .... } func inverse (a Matrix) Matrix { .... } // utility fxn: create a futureMatrix from a given matrix func promise (a Matrix) futureMatrix { future := make (futureMatrix, 1) future <- a; return future; }
Use the matrix package
func InverseProduct (a Matrix, b Matrix) { a_inv := Inverse(a) b_inv := Inverse(b) return Product(a_inv, b_inv) } // async way func InverseProduct (a Matrix, b Matrix) { a_inv_future := InverseAsync(a); b_inv_future := InverseAsync(b); a_inv := <-a_inv_future; b_inv := <-b_inv_future; return Product(a_inv, b_inv); }
Multiplexing
Client-server applications are the kind of applications where goroutines and channels shine.
Server side simulator sample
type Request struct { a, b int replyChan chan int // reply channel inside the Request } type binOp func(a, b int) int func run(op binOp, req *Request) { req.replyChan <- op(req.a, req.b) } func server(op binOp, service chan *Request, quitChan chan bool) { for { select { case req := <-service: go run(op, req) case <-quitChan: return } } } func startServer(op binOp) (service chan *Request, quitChan chan bool) { service = make(chan *Request) quitChan = make(chan bool) go server(op, service, quitChan) return service, quitChan } func main() { adder, quitChan := startServer(func(a, b int) int { return a + b }) const N = 100 var reqs [N]Request for i := 0; i < N; i++ { req := &reqs[i] req.a = i req.b = i + N req.replyChan = make(chan int) adder <- req } // checks: for i := N - 1; i >= 0; i-- { // doesn’t matter what order if <-reqs[i].replyChan != N+2*i { fmt.Println("fail at", i) } else { fmt.Println("Request ", i, "is ok!") } } quitChan <- true fmt.Println("done") }
Parallel For-Loop
In summary, we need a channel for synchronization purposes (used as a semaphore) when implementing a parallel for-loop, but we do not need to communicate with goroutine through channels when the stack works perfectly well.
xi := make(float chan); out := make(float chan); for _,xi := range data { xch := make(float chan); go func () { xi := <- xch; out <- doSomething(xi); }() xch <- xi; }
Another sample of parallel-loop with semaphore
func VectorScalarAdd (v []float, s float) { sem := make (semaphore, len(v)); for i,_ := range v { go func (i int) { v [i] += s; sem.Signal(); } (i); }() sem.Wait(len(v)); }
Concurrent access to object with channel.
To safeguard concurrent modifications of an object instead of using locking with a sync Mutex we can also use a backend goroutine for the sequential execution of anonymous functions.
In the following program we have a type Person which now contains a field chF, a channel of anonymous functions. This is initialized in the constructor-method NewPerson, which also starts a method backend() as a goroutine.This method executes in an infinite loop all the functions placed on chF, effectively serializing them and thus providing safe concurrent access. The methods that change and retrieve the salary make an anonymous function which does that and put this function on chF, and backend() will sequentially execute them.
type Person struct { Name string salary float64 chF chan func() } func NewPerson(name string, salary float64) *Person { p := &Person{name, salary, make(chan func())} go p.backend() return p } func (p *Person) backend() { for f := range p.chF { f() } } // Set salary. func (p *Person) SetSalary(sal float64) { p.chF <- func() { p.salary = sal } } // Retrieve salary. func (p *Person) Salary() float64 { fChan := make(chan float64) p.chF <- func() { fChan <- p.salary } return <-fChan } func (p *Person) String() string { return "Person - name is: " + p.Name + " - salary is: " + strconv.FormatFloat(p.Salary(), 'f', 2, 64) } func main() { bs := NewPerson("Smith Bill", 2500.5) fmt.Println(bs) bs.SetSalary(4000.25) fmt.Println("Salary changed:") fmt.Println(bs) }