szaydel
11/11/2019 - 5:16 AM

Process CPU utilization experiment

package main

import (
	"math"
)

func sum(nums []float64) float64 {
	var total float64
	for _, n := range nums {
		if !math.IsNaN(n) {
			total += n
		}
	}
	return total
}

func avg(nums []float64) float64 {
	var skip = countNaNs(nums)
	// If all values are NaNs, cnt will be zero, and as a result, dividing by
	// zero will also be a NaN value.
	var cnt = len(nums) - skip
	return sum(nums) / float64(cnt)
}

func countNaNs(nums []float64) int {
	if len(nums) == 0 {
		return 0
	}
	if len(nums) == 1 {
		if math.IsNaN(nums[0]) {
			return 1
		}
		return 0
	}
	if math.IsNaN(nums[0]) {
		return 1 + countNaNs(nums[1:])
	}
	return countNaNs(nums[1:])
}
package main

import (
	"bytes"
	"fmt"
	"os"
	"time"
)

// ProcStat is essentially parsed contents of /proc/<pid>/stat.
type ProcStat struct {
	// The process ID.
	PID int `json:"pid"`
	// The filename of the executable.
	Comm string `json:"comm"`
	// The process state.
	State string `json:"process_state"`
	// The PID of the parent of this process.
	PPID int `json:"parent_pid"`
	// The process group ID of the process.
	PGRP int `json:"process_group"`
	// The session ID of the process.
	Session int `json:"session"`
	// The controlling terminal of the process.
	TTY int `json:"tty"`
	// The ID of the foreground process group of the controlling terminal of
	// the process.
	TPGID int `json:"term_process_group_id"`
	// The kernel flags word of the process.
	Flags uint `json:"flags"`
	// The number of minor faults the process has made which have not required
	// loading a memory page from disk.
	MinFlt uint `json:"minor_faults"`
	// The number of minor faults that the process's waited-for children have
	// made.
	CMinFlt uint `json:"child_minor_faults"`
	// The number of major faults the process has made which have required
	// loading a memory page from disk.
	MajFlt uint `json:"major_faults"`
	// The number of major faults that the process's waited-for children have
	// made.
	CMajFlt uint `json:"child_major_faults"`
	// Amount of time that this process has been scheduled in user mode,
	// measured in clock ticks.
	UTime uint `json:"user_time_ticks"`
	// Amount of time that this process has been scheduled in kernel mode,
	// measured in clock ticks.
	STime uint `json:"system_time_ticks"`
	// Amount of time that this process's waited-for children have been
	// scheduled in user mode, measured in clock ticks.
	CUTime uint `json:"child_user_time_ticks"`
	// Amount of time that this process's waited-for children have been
	// scheduled in kernel mode, measured in clock ticks.
	CSTime uint `json:"child_system_time_ticks"`
	// For processes running a real-time scheduling policy, this is the negated
	// scheduling priority, minus one.
	Priority int `json:"priority"`
	// The nice value, a value in the range 19 (low priority) to -20 (high
	// priority).
	Nice int `json:"nice"`
	// Number of threads in this process.
	NumThreads int `json:"num_threads"`
	// The time the process started after system boot, the value is expressed
	// in clock ticks.
	Starttime uint64 `json:"starttime"`
	// Virtual memory size in bytes.
	VSize uint `json:"virt_memory_bytes"`
	// Resident set size in pages.
	RSS int `json:"rss_pages"`
}

// ProcInfo maintains information about a single process
type ProcInfo struct {
	Name        string        `json:"name"`
	Role        string        `json:"role"`
	Args        []string      `json:"args"`
	PID         int           `json:"pid"`
	AgeTicks    int64         `json:"age_ticks"`
	AgeDuration time.Duration `json:"age_nanoseconds"`
	S           *ProcStat     `json:"process_stats"`
}

func (ps ProcStat) OnCPUTimeTotal() int64 {
	return int64(ps.STime + ps.UTime + ps.CSTime + ps.CUTime)
}

// Stat returns the current status information of the process.
func (p ProcInfo) Stat() (ProcStat, bool) {
	data, err := ReadFileNoStat(p.path("stat"))
	if err != nil {
		if _, ok := err.(*os.PathError); ok {
			return ProcStat{}, false
		}
		handleErr(err, true)
	}
	var (
		ignore int

		s = ProcStat{PID: p.PID}
		l = bytes.Index(data, []byte("("))
		r = bytes.LastIndex(data, []byte(")"))
	)

	if l < 0 || r < 0 {
		handleErr(fmt.Errorf(
			"unexpected format, couldn't extract comm: %s",
			data,
		), true)
	}

	s.Comm = string(data[l+1 : r])
	_, err = fmt.Fscan(
		bytes.NewBuffer(data[r+2:]),
		&s.State,
		&s.PPID,
		&s.PGRP,
		&s.Session,
		&s.TTY,
		&s.TPGID,
		&s.Flags,
		&s.MinFlt,
		&s.CMinFlt,
		&s.MajFlt,
		&s.CMajFlt,
		&s.UTime,
		&s.STime,
		&s.CUTime,
		&s.CSTime,
		&s.Priority,
		&s.Nice,
		&s.NumThreads,
		&ignore,
		&s.Starttime,
		&s.VSize,
		&s.RSS,
	)
	if err != nil {
		handleErr(err, true)
	}
	return s, true
}

func (p ProcInfo) path(name string) string {
	return fmt.Sprintf("/proc/%d/%s", p.PID, name)
}

func (p ProcInfo) ProcAgeAsTicks() int64 {
	return monotonicClockTicks() - int64(p.S.Starttime)
}

func (p ProcInfo) ProcAgeAsDuration() time.Duration {
	return time.Duration(
		monotonicSinceBoot().Nanoseconds() -
			ticksToNsecs(int64(p.S.Starttime)))
}

// CPUTimes tracks two observations of time for a process. There are
// two samples stored previous observation and current, for the purposes
// of computing a delta.
type CPUTimes struct {
	PrevRunTime      int64 // Total time spent running on or off CPU - last
	CurrentRunTime   int64 // Total time spent running on or off CPU - latest
	PrevOnCPUTime    int64 // Time spent on CPU - last
	CurrentOnCPUTime int64 // Time spent on CPU - latest
}

// Delta computes a derivative between current sample and previously taken
// sample. We first take the difference in time on CPU, and divide this value
// by effectively interval between collections. If interval is say 100 ticks,
// and the difference in on CPU time is 90 ticks, rate is 90/100, or 0.9, which
// is to say that process spent 90% of last interval on CPU.
func (t CPUTimes) Delta() float64 {
	// fmt.Printf("CurOnCPU: %d PrevOnCPU: %d CurWT: %d PrevWT: %d\n",
	// t.CurrentOnCPUTime, t.PrevOnCPUTime,
	// t.CurrentRunTime, t.PrevRunTime)
	return float64(t.CurrentOnCPUTime-t.PrevOnCPUTime) / float64(t.CurrentRunTime-t.PrevRunTime)
}

// Reset will zero-out all CPU times. This is useful for instances where
// we can no longer gather statistics, possibly because process was restarted.
func (t *CPUTimes) Reset() {
	t.PrevRunTime = 0
	t.CurrentRunTime = 0
	t.PrevOnCPUTime = 0
	t.CurrentOnCPUTime = 0
}
package main

/*
#include <time.h>
#include <unistd.h>
extern int clock_gettime(clockid_t clock_id, struct timespec *tp);
extern long sysconf(int name);
*/
import "C"

import (
	// "encoding/json"
	"fmt"
	"io"
	"io/ioutil"
	"math"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"
)

func handleErr(e error, doPanic bool) {
	if e != nil && e != io.EOF {
		if doPanic {
			panic(e)
		}
		fmt.Fprintf(os.Stderr, "Error: %v\n", e)
	}
}

func ticksToNsecs(ticks int64) int64 {
	var hz_per_sec_c C.long
	var secs int64
	hz_per_sec_c = C.sysconf(C._SC_CLK_TCK)
	secs = (ticks / int64(hz_per_sec_c))
	return secs * 1e9
}

func monotonicClockTicks() int64 {
	var ts C.struct_timespec
	var hz_per_sec_c C.long
	var ns int64
	C.clock_gettime(C.CLOCK_MONOTONIC, &ts)
	hz_per_sec_c = C.sysconf(C._SC_CLK_TCK)

	ns = int64(ts.tv_sec) * 1e9
	ns += int64(ts.tv_nsec)
	return (ns * int64(hz_per_sec_c)) / 1e9
}

func monotonicSinceBoot() time.Duration {
	var ts C.struct_timespec
	C.clock_gettime(C.CLOCK_MONOTONIC, &ts)
	return time.Duration(int64(ts.tv_sec*1e9) + int64(ts.tv_nsec))
}

func nullByteToSpace(b []byte) []byte {
	for i, v := range b {
		if v == 0x0 {
			b[i] = 0x20 // ASCII space character
		}
	}
	return b
}

// type Count int
// type Role string

// const Proxy Role = "proxy"
// const Worker Role = "worker"
// const Manager Role = "manager"
// const Logger Role = "logger"

// func expectedCount(r Role) Count {
// 	m := map[Role]Count{
// 		Proxy:   2,
// 		Worker:  1,
// 		Manager: 1,
// 		Logger:  1,
// 	}
// 	return m[r]
// }

// CommandLine is a representation of a process' arguments and name
// separated into a slice of argument strings and a name string.
type CommandLine struct {
	args        []string
	programName string
}

// ProgramName is the name of the program for the given process, often
// known as argv[0].
func (c CommandLine) ProgramName() string {
	return c.programName
}

// Args is the set of arguments, sans what would be argv[0], i.e. ProgramName.
// It is effectively argv[1...N]
func (c CommandLine) Args() []string {
	return c.args
}

func cmdLineArgs(pid int) *CommandLine {
	var buf = make([]byte, 256)
	var f *os.File
	var err error
	var n int
	var programName string
	var restOfCmdline []string

	var cmdLinePath = fmt.Sprintf("/proc/%d/cmdline", pid)
	if f, err = os.Open(cmdLinePath); err != nil {
		handleErr(err, false)
	}
	defer f.Close()
	if n, err = f.Read(buf); err != nil {
		handleErr(err, false)
	}
	cmdLineSlice := strings.Split(string(nullByteToSpace(buf[:n])), " ")
	programName = cmdLineSlice[0]

	if len(cmdLineSlice) > 1 {
		restOfCmdline = cmdLineSlice[1:]
	} else {
		restOfCmdline = []string{}
	}
	return &CommandLine{
		programName: programName,
		args:        restOfCmdline,
	}
}

// ReadFileNoStat uses ioutil.ReadAll to read contents of entire file.
// This is similar to ioutil.ReadFile but without the call to os.Stat, because
// many files in /proc and /sys report incorrect file sizes (either 0 or 4096).
// Reads a max file size of 512kB.  For files larger than this, a scanner
// should be used.
func ReadFileNoStat(filename string) ([]byte, error) {
	const maxBufferSize = 1024 * 512

	f, err := os.Open(filename)
	if err != nil {
		return nil, err
	}
	defer f.Close()

	reader := io.LimitReader(f, maxBufferSize)
	return ioutil.ReadAll(reader)
}

func buildProcInfo(procfile string) *ProcInfo {
	pSlc := strings.Split(procfile, "/")
	// FIXME: Check length of slice
	pid, err := strconv.Atoi(pSlc[len(pSlc)-1])
	handleErr(err, true)
	args := cmdLineArgs(pid)
	proci := &ProcInfo{}
	proci.Name = args.ProgramName()
	proci.Args = args.Args()
	if len(proci.Args) > 1 {
		proci.Role = proci.Args[1]
	} else {
		proci.Role = "unknown"
	}
	proci.PID = pid
	var s ProcStat
	var ok bool
	if s, ok = proci.Stat(); !ok {
		return nil
	}
	proci.S = &s
	proci.AgeTicks = proci.ProcAgeAsTicks()
	proci.AgeDuration = proci.ProcAgeAsDuration()
	return proci
}

func findProcsByName(name string) []*ProcInfo {
	paths, err := filepath.Glob("/proc/[0-9]*")
	handleErr(err, true)
	var piSlc = make([]*ProcInfo, 0)
	for _, procfile := range paths {
		// fmt.Printf("procfile: %s\n", procfile)
		pSlc := strings.Split(procfile, "/")
		// FIXME: Check length of slice
		pid, err := strconv.Atoi(pSlc[len(pSlc)-1])
		handleErr(err, true)
		args := cmdLineArgs(pid)
		if args.ProgramName() == name {
			// If buildProcInfo returns nil, a process is likely no longer valid
			// and instead of adding it to this slice, we skip it.
			// This check runs periodically and if the process that just went
			// away is restarted, it will get picked-up on next run.
			pi := buildProcInfo(procfile)
			if pi != nil {
				piSlc = append(piSlc, pi)
			}
		}
	}
	return piSlc
}

// IntervalReport is a point in time view of process' CPU usage with three
// figures, WindowRate, LifeTimeRate and CurrentRate.
// WindowRate - an average of samples over several intervals, which effectively
// makes the data smoother.
// LifeTimeRate - rate of time spent on CPU over total process' runtime,
// computed over the entire lifetime of process; least volatile.
// CurrentRate - derivative between two interval samples; most volatile.
type IntervalReport struct {
	PID int
	Role string
	Timestamp    time.Time
	WindowRate   float64
	LifetimeRate float64
	CurrentRate  float64
}

func startIntervalReport(c <-chan *IntervalReport) {
	for {
		select {
		case v := <-c:
			fmt.Printf("%+v\n", v)
		default:
			<-time.NewTimer(1 * time.Second).C
		}
	}
}

// startMonitors periodically scans the process table by reading through /proc
// and picks out only those processes that we are interested in. These processes
// are then added to a map of process roles to PIDs, where a role is something
// like logger, or manager or worker-XX, etc. For each entry in this map, a
// channel is created in a corresponding map. Also, a goroutine is started for
// every process, and the channel for the given process is passed to this
// goroutine, establishing a one-way communication mechanism. This channel is in
// essence an updates channel. Initially, information about each process that we
// intend to monitor is passed to the goroutine on the other end of the channel,
// and then subsequently, any time this process is restarted, new information
// about the process is passed to goroutine responsible for this process.
// As a side-effect of these periodic checks, if we detect at some point a
// process that is not already in the map, we begin to track this process and
// create a new monitor goroutine for it.
func startMonitors(stopChan chan struct{}, repChan chan *IntervalReport) {
	var processMap = make(map[string]int)
	var channelsMap = make(map[string]chan *ProcInfo)
	// var ivReportChan = make(chan *IntervalReport)
	for {
		processes := findProcsByName("bro")
		for _, p := range processes {
			if v, ok := processMap[p.Role]; ok {
				if v != p.PID { // This process' PID changed
					fmt.Println("PID changed, take action, save pid")
					processMap[p.Role] = p.PID
					channelsMap[p.Role] <- p
				}
			} else { // This process' Role is not already in the map
				processMap[p.Role] = p.PID
				channelsMap[p.Role] = make(chan *ProcInfo)
				go monitor(channelsMap[p.Role], repChan)
				channelsMap[p.Role] <- p
				fmt.Printf("Added %s => %d to map\n", p.Role, p.PID)
			}
		}
		// It may take this much time to detect that a process got restarted
		// or that a new process was added to system.
		time.Sleep(4 * time.Second)
	}
}

func monitor(p <-chan *ProcInfo, r chan<- *IntervalReport) {
	const window = 10
	var watching *ProcInfo
	var samples = make([]float64, window)
	var counter uint64
	var times CPUTimes
	var lifetimeRate float64
	for {
		select {
		case v := <-p:
			watching = v
			fmt.Printf("monitoring: %s with PID: %d %p\n", watching.Role, watching.PID, watching)
		default:
			if s, ok := watching.Stat(); ok {
				lifetimeRate = float64(s.OnCPUTimeTotal()) / float64(watching.ProcAgeAsTicks())
				samples[counter%window] = lifetimeRate

				times.PrevOnCPUTime = times.CurrentOnCPUTime
				times.PrevRunTime = times.CurrentRunTime

				// If we don't have any value for previous runtime, this is the
				// first time we gather stats. In this case we set both current
				// and previous values to the sample we just collected.
				// If previous value is larger than current value, we are no
				// longer looking at same process. We behave just like we would
				// on first run, setting both previous and current values to the
				// sample we just collected.
				if times.PrevRunTime > times.CurrentRunTime ||
					times.PrevRunTime == 0 {
					times.PrevOnCPUTime = s.OnCPUTimeTotal()
					times.CurrentOnCPUTime = s.OnCPUTimeTotal()
					times.PrevRunTime = watching.ProcAgeAsTicks()
					times.CurrentRunTime = watching.ProcAgeAsTicks()
				} else {
					times.CurrentOnCPUTime = s.OnCPUTimeTotal()
					times.CurrentRunTime = watching.ProcAgeAsTicks()
				}
			} else {
				samples[counter%10] = math.NaN()
				times.Reset()
			}
			counter++
			// fmt.Printf("counter: %d | %+v\n", counter, samples)
			if counter >= 10 {
				// fmt.Printf("DELTA: %f | Avg: %f Latest: %f\n", times.Delta(), avg(samples), lifetimeRate)
				r <- &IntervalReport{
					PID: watching.PID,
					Role: watching.Role,
					Timestamp: time.Now(),
					WindowRate: avg(samples),
					LifetimeRate: lifetimeRate,
					CurrentRate: times.Delta(),
				}
				// fmt.Printf("counter: %d | avg: %f\n", counter, avg(samples))
			}

			// times.Delta()
			time.Sleep(1 * time.Second)
		}
	}
}

func main() {
	// processes := findProcsByName("bro")
	// j, _ := json.Marshal(processes)
	// fmt.Printf("%v\n", string(j))

	shutdownChan := make(chan struct{})
	intervalReportChan := make(chan *IntervalReport)
	go startMonitors(shutdownChan, intervalReportChan)
	go startIntervalReport(intervalReportChan)
	time.Sleep(100 * time.Second)
	shutdownChan <- struct{}{}
}