kareem-h
7/10/2015 - 10:50 AM

Single channel version of http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang

// Worker represents the worker that executes the job
type Worker struct {
	JobChannel chan Job
	quit       chan bool
}

func NewWorker(jobChannel chan Job) Worker {
	return Worker{
		JobChannel: jobChannel,
		quit:       make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
	go func() {
		for {
			select {
			case job := <-w.JobChannel:
				// we have received a work request.
				if err := job.Payload.UploadToS3(); err != nil {
					log.Errorf("Error uploading to S3: %s", err.Error())
				}

			case <-w.quit:
				// we have received a signal to stop
				return
			}
		}
	}()
}

func (w Worker) Stop() {
	go func() {
		w.quit <- true
	}()
}

type Dispatcher struct {
	JobChannel chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan Job, maxWorkers)
	return &Dispatcher{JobChannel: pool}
}

func (d *Dispatcher) Run() {
	// starting n number of workers
	for i := 0; i < d.maxWorkers; i++ {
		worker := NewWorker(d.JobChannel)
		worker.Start()
	}

	go d.dispatch()
}

func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			// a job request has been received
			go func(job Job) {
				// dispatch the job to the workers job channel
				// we can buffer up to one job per worker
				d.JobChannel <- job
			}(job)
		}
	}
}