ggzeng
10/22/2019 - 8:37 AM

goroutine tunny pool

使用tunny包创建goroutine池

package main

import (
	"io/ioutil"
	"net/http"
	"runtime"

	"github.com/Jeffail/tunny"
)

func main() {
	numCPUs := runtime.NumCPU()

    // 后面的pool.Process中会调用此函数,而process的参数也是此函数的参数
	pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
		var result []byte

		// TODO: Something CPU heavy with payload

		return result
	})
	defer pool.Close()

	http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
		input, err := ioutil.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "Internal error", http.StatusInternalServerError)
		}
		defer r.Body.Close()

		// Funnel this work into our pool. This call is synchronous and will
		// block until the job is completed.
		result := pool.Process(input)

		w.Write(result.([]byte))
	})

	http.ListenAndServe(":8080", nil)
}
// 实现callback有两个方法

func printHello() { // 无参数,无返回值
    fmt.Println("Hello!")
}
// 方法1
pool3 := tunny.NewFunc(3, func(payload interface{}) interface{} {
    f, ok := payload.(func())
    if !ok {
        return nil
    }
    f()
    return f
})
pool3.Process(printHello)

// 方法2
pool2 := tunny.NewCallback(2)   // 省去了自己实现调用函数
var wg sync.WaitGroup
pool2 := tunny.NewCallback(2)

for i:= 0; i < 10; i++ {        // 控制并发量的例子
	wg.Add(1)                   // 注意:需要写到go fun外面
	go func() {
	    defer wg.Done()
		pool2.Process(printHello)
	}()
}
wg.Wait()
fmt.Println("all done")
package main
 
import (
	"fmt"
	"github.com/Jeffail/tunny"
)

func printHello(str interface{}) interface{} {
	fmt.Println("Hello " + str.(string))
	return 1
}
func printHi(str interface{}) interface{} {
	fmt.Println("Hi " + str.(string))
	return 1
}

type myWorker struct {
    processor func(interface{}) interface{}
}

func (w *myWorker) Process(payload interface{}) interface{} { // 执行任务
    return w.processor(payload)
}

func (w *myWorker) BlockUntilReady() {} // 在执行任务前执行,相当于init
func (w *myWorker) Interrupt()       {} // 在任务执行时被终止时,会执行该函数
func (w *myWorker) Terminate()       {} // 当协程被关闭时,执行该函数

func main() {
    work1 := new(myWorker)
    pool1 := tunny.New(3, func() tunny.Worker {
                 return work1
             })
    work1.processor = printHello
    pool1.Process("xxx")
    work1.processor = printHi
    pool1.Process("xxx")

}