[Go blocking channels] Synchronize and block with channels in Go #golang, #go, #go channels, #go synchronization, #goroutines, #waitgroups, #go threads, #sync, #thread synchronization,
package main
import (
"fmt"
"os"
"strings"
"sync"
)
func main() {
devNull, _ := os.Create(os.DevNull) // Operating System's /dev/null
out := make(chan string)
// WaitGroup is used for thread synchronization. As long as there is
// > 0 waiters on this group, goroutine calling wg.Wait() is going to
// be blocked.
// wg.Wait() is the last call in this example, before main() returns.
wg := sync.WaitGroup{}
wg.Add(1) // Add waiter to wait group
go func(c chan string) <-chan string {
fromStdIn := make([]byte, 1024) // 1K buffer
for true { // Effectively infinite loop, but Go does not complain
if n, _ := os.Stdin.Read(fromStdIn); n > 0 {
c <- string(fromStdIn[:n-1]) // Trim empty part of buffer
}
}
close(c) // We do not necessarily need to close this if we
// expect the program to run indefinitely.
return c
}(out)
go func() {
for {
select {
// This case blocks the main thread, but prevents the program
// from running away with the CPU like a bandit.
case v := <-out:
if strings.Compare(v, "quit") == 0 {
fmt.Printf("Quitting...\n")
wg.Done() // Decrement wg, reducing count to 0
} else {
fmt.Fprintf(devNull, "%s", v)
}
}
}
}()
// As long is waiters > 0, thread does not return.
wg.Wait()
}
package main
import (
"fmt"
"math/rand"
"time"
)
const TIMEOUT = 20
// someSlowFunction simulates a function that has undefined
// runtime.
func someSlowFunction(n int) int64 {
before := time.Now().UnixNano()
// Inject randomness, to simulate unknown runtime
time.Sleep(time.Duration(rand.Intn(10)*n) * time.Second)
return time.Now().UnixNano() - before // Return delta time
}
func main() {
out := make(chan int64, 10)
// Setup a 10 second time as a means to limiting runtime to 10 seconds
done := time.NewTimer(TIMEOUT * time.Second).C
// Consume a "slow" function and lean on select to
go func(f func(n int) int64, c chan int64) <-chan int64 {
defer close(c)
for i := 0; i < 10; i++ {
c <- f(i) // Map f over values of i
}
return c
}(someSlowFunction, out)
go func() {
for i := 0; i < 10; i++ {
result := <-out
fmt.Printf("Elapsed=%d(ns)\n", result)
}
}()
<-done
fmt.Printf("Timeout\n")
}
// for {
// // Select will block as long as neither channel is ready.
// // the done channel will become ready when the timer expires
// // and will result in main returning as soon as timer is triggered,
// // even if the out channel is still receiving data.
// select {
// case result := <-out:
// fmt.Printf("Elapsed=%d(ns)\n", result)
// case <-done:
// fmt.Printf("Timeout\n")
// return
// }
// }
// }
package main
import (
"fmt"
"os"
)
func main() {
devNull, _ := os.Create(os.DevNull) // Operating System's /dev/null
out := make(chan int)
go func(c chan int) <-chan int {
fromStdIn := make([]byte, 1024) // 1K buffer
for true { // Effectively infinite loop, but Go does not complain
if n, _ := os.Stdin.Read(fromStdIn); n > 0 {
c <- n // Put number of chars read onto channel
}
}
close(c) // We do not necessarily need to close this if we
// expect the program to run indefinitely.
return c
}(out)
for {
select {
// This case blocks the main thread, but prevents the program
// from running away with the CPU like a bandit.
case v := <-out:
fmt.Fprintf(devNull, "%d", v)
// If this default case were (un)commented, it would end-up
// being selected vast amount of time, resulting in effectively
// no actual work being done, resulting in baking the CPU.
// if return were (un)commented the for loop would
// immediately terminate since the channel starts out empty,
// resulting in select choosing the default case, and returning.
//
// Values are placed onto the channel as the input from
// stdin is consumed by the program.
//
// default:
// return
}
}
}
package main
import (
"fmt"
"math/rand"
"time"
)
type callbackFun func(n int) bool
// cbHandlerFun receives callback functions via a channel
// as well as values and executes callbacks with given value,
// reporting result of each call.
func cbHandlerFun(ch <-chan callbackFun, r <-chan int) {
var numTrue, numFalse int
for {
f := <-ch
if f(<-r) {
numTrue++
} else {
numFalse++
}
fmt.Printf("True(s)=%d False(s)=%d\n", numTrue, numFalse)
}
}
// timerFun implements a basic timeout routine, which could be
// used as a means of a Sleep alternative.
func timerFun() <-chan time.Time {
// Create and return a timer channel which the main
// thread will block on, until timer fires.
done := time.NewTimer(5 * time.Second).C
return done
}
func workFun(ch chan<- callbackFun, r chan<- int) {
// Doing some work and then sending callbacks once work
// is done.
randomNumber := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
r <- randomNumber.Intn(100)
ch <- func(n int) bool {
if n%2 == 0 {
fmt.Printf("%d is even\n", n)
return true
}
fmt.Printf("%d is odd\n", n)
return false
}
r <- randomNumber.Intn(100)
ch <- func(n int) bool {
if n > 50 {
fmt.Printf("%d is > 50\n", n)
return true
}
fmt.Printf("%d is <= 50\n", n)
return false
}
}
func main() {
ch := make(chan callbackFun, 10)
results := make(chan int, 10)
// Callback handler will remain resident and maintains
// state internally for as long as the main thread is alive.
go cbHandlerFun(ch, results)
// If our task or tasks need to run in some perpetual loop,
// we should encapsulate this part of the logic in a for loop.
// The loop will block as it waits for work to happen in goroutines,
// with channels used for synchronization and timeout handling.
// We can use select in this instance as well, but we only need it if
// some additional action is required after timeout is received.
for {
done := timerFun()
workFun(ch, results)
<-done // Blocks, waiting for data on this channel
}
}