szaydel
9/30/2017 - 3:28 PM

[Go blocking channels] Synchronize and block with channels in Go #golang, #go, #go channels, #go synchronization, #goroutines, #waitgroups,

[Go blocking channels] Synchronize and block with channels in Go #golang, #go, #go channels, #go synchronization, #goroutines, #waitgroups, #go threads, #sync, #thread synchronization,

package main

import (
	"fmt"
	"os"
	"strings"
	"sync"
)

func main() {
	devNull, _ := os.Create(os.DevNull) // Operating System's /dev/null
	out := make(chan string)

	// WaitGroup is used for thread synchronization. As long as there is
	// > 0 waiters on this group, goroutine calling wg.Wait() is going to
	// be blocked.
	// wg.Wait() is the last call in this example, before main() returns.
	wg := sync.WaitGroup{}

	wg.Add(1) // Add waiter to wait group

	go func(c chan string) <-chan string {
		fromStdIn := make([]byte, 1024) // 1K buffer
		for true {                      // Effectively infinite loop, but Go does not complain
			if n, _ := os.Stdin.Read(fromStdIn); n > 0 {
				c <- string(fromStdIn[:n-1]) // Trim empty part of buffer
			}
		}
		close(c) // We do not necessarily need to close this if we
		// expect the program to run indefinitely.
		return c
	}(out)

	go func() {
		for {
			select {
			// This case blocks the main thread, but prevents the program
			// from running away with the CPU like a bandit.
			case v := <-out:
				if strings.Compare(v, "quit") == 0 {
					fmt.Printf("Quitting...\n")
					wg.Done() // Decrement wg, reducing count to 0
				} else {
					fmt.Fprintf(devNull, "%s", v)
				}
			}
		}
	}()

	// As long is waiters > 0, thread does not return.
	wg.Wait()
}
package main

import (
	"fmt"
	"math/rand"
	"time"
)

const TIMEOUT = 20

// someSlowFunction simulates a function that has undefined
// runtime.
func someSlowFunction(n int) int64 {
	before := time.Now().UnixNano()
	// Inject randomness, to simulate unknown runtime
	time.Sleep(time.Duration(rand.Intn(10)*n) * time.Second)
	return time.Now().UnixNano() - before // Return delta time
}

func main() {
	out := make(chan int64, 10)
	// Setup a 10 second time as a means to limiting runtime to 10 seconds
	done := time.NewTimer(TIMEOUT * time.Second).C

	// Consume a "slow" function and lean on select to
	go func(f func(n int) int64, c chan int64) <-chan int64 {
		defer close(c)
		for i := 0; i < 10; i++ {
			c <- f(i) // Map f over values of i
		}
		return c
	}(someSlowFunction, out)

	go func() {
		for i := 0; i < 10; i++ {
			result := <-out
			fmt.Printf("Elapsed=%d(ns)\n", result)
		}
	}()

	<-done
	fmt.Printf("Timeout\n")
}

// 	for {
// 		// Select will block as long as neither channel is ready.
// 		// the done channel will become ready when the timer expires
// 		// and will result in main returning as soon as timer is triggered,
// 		// even if the out channel is still receiving data.
// 		select {
// 		case result := <-out:
// 			fmt.Printf("Elapsed=%d(ns)\n", result)
// 		case <-done:
// 			fmt.Printf("Timeout\n")
// 			return
// 		}
// 	}
// }
package main

import (
	"fmt"
	"os"
)

func main() {
	devNull, _ := os.Create(os.DevNull) // Operating System's /dev/null
	out := make(chan int)
	go func(c chan int) <-chan int {
		fromStdIn := make([]byte, 1024) // 1K buffer
		for true {                      // Effectively infinite loop, but Go does not complain
			if n, _ := os.Stdin.Read(fromStdIn); n > 0 {
				c <- n // Put number of chars read onto channel
			}
		}
		close(c) // We do not necessarily need to close this if we
		// expect the program to run indefinitely.
		return c
	}(out)

	for {
		select {
		// This case blocks the main thread, but prevents the program
		// from running away with the CPU like a bandit.
		case v := <-out:
			fmt.Fprintf(devNull, "%d", v)
			// If this default case were (un)commented, it would end-up
			// being selected vast amount of time, resulting in effectively
			// no actual work being done, resulting in baking the CPU.
			// if return were (un)commented the for loop would
			// immediately terminate since the channel starts out empty,
			// resulting in select choosing the default case, and returning.
			//
			// Values are placed onto the channel as the input from
			// stdin is consumed by the program.
			//
			// default:
			// return
		}
	}
}
package main

import (
	"fmt"
	"math/rand"
	"time"
)

type callbackFun func(n int) bool

// cbHandlerFun receives callback functions via a channel
// as well as values and executes callbacks with given value,
// reporting result of each call.
func cbHandlerFun(ch <-chan callbackFun, r <-chan int) {
	var numTrue, numFalse int
	for {
		f := <-ch
		if f(<-r) {
			numTrue++
		} else {
			numFalse++
		}
		fmt.Printf("True(s)=%d False(s)=%d\n", numTrue, numFalse)
	}
}

// timerFun implements a basic timeout routine, which could be
// used as a means of a Sleep alternative.
func timerFun() <-chan time.Time {
	// Create and return a timer channel which the main
	// thread will block on, until timer fires.
	done := time.NewTimer(5 * time.Second).C
	return done
}

func workFun(ch chan<- callbackFun, r chan<- int) {
	// Doing some work and then sending callbacks once work
	// is done.
	randomNumber := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
	r <- randomNumber.Intn(100)
	ch <- func(n int) bool {
		if n%2 == 0 {
			fmt.Printf("%d is even\n", n)
			return true
		}
		fmt.Printf("%d is odd\n", n)
		return false
	}
	r <- randomNumber.Intn(100)
	ch <- func(n int) bool {
		if n > 50 {
			fmt.Printf("%d is > 50\n", n)
			return true
		}
		fmt.Printf("%d is <= 50\n", n)
		return false
	}
}

func main() {
	ch := make(chan callbackFun, 10)
	results := make(chan int, 10)

	// Callback handler will remain resident and maintains
	// state internally for as long as the main thread is alive.
	go cbHandlerFun(ch, results)

	// If our task or tasks need to run in some perpetual loop,
	// we should encapsulate this part of the logic in a for loop.
	// The loop will block as it waits for work to happen in goroutines,
	// with channels used for synchronization and timeout handling.
	// We can use select in this instance as well, but we only need it if
	// some additional action is required after timeout is received.
	for {
		done := timerFun()
		workFun(ch, results)
		<-done // Blocks, waiting for data on this channel
	}
}