Browse Source

refactor, fixes

pull/1943/head
Ethan Buchman 9 years ago
parent
commit
d835cfe3e7
5 changed files with 518 additions and 230 deletions
  1. +0
    -225
      event_meter.go
  2. +152
    -0
      handlers/handlers.go
  3. +77
    -0
      handlers/routes.go
  4. +170
    -5
      main.go
  5. +119
    -0
      types/types.go

+ 0
- 225
event_meter.go View File

@ -1,225 +0,0 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"os"
"reflect"
"sync"
"time"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-crypto"
// register rpc and event types with go-
ctypes "github.com/tendermint/tendermint/rpc/core/types"
// "github.com/tendermint/tendermint/types"
client "github.com/tendermint/tendermint/rpc/client"
"github.com/gorilla/websocket"
"github.com/rcrowley/go-metrics"
)
//------------------------------------------------------
// Connect to all validators for a blockchain
type Blockchain struct {
ID string
Validators []Validator
}
type Validator struct {
ID string
PubKey crypto.PubKey
IP string
Port int
}
//------------------------------------------------------
// Generic system to subscribe to events and record their frequency
// Metrics for a given event
type EventMetric struct {
ID string `json:"id"`
Started time.Time `json:"start_time"`
LastHeard time.Time `json:"last_heard"`
MinDuration int64 `json:"min_duration"`
MaxDuration int64 `json:"max_duration"`
// tracks event count and rate
meter metrics.Meter
// filled in from the Meter
Count int64 `json:"count"`
Rate1 float64 `json:"rate_1"`
Rate5 float64 `json:"rate_5"`
Rate15 float64 `json:"rate_15"`
RateMean float64 `json:"rate_mean"`
// XXX: move this
// latency for node itself (not related to event)
Latency float64 `json:"latency_mean"`
}
// Each node gets an event meter to track events for that node
type EventMeter struct {
QuitService
wsc *client.WSClient
mtx sync.Mutex
events map[string]*EventMetric
// to record latency
timer metrics.Timer
lastPing time.Time
receivedPong bool
}
func NewEventMeter(addr string) *EventMeter {
em := &EventMeter{
wsc: client.NewWSClient(addr),
events: make(map[string]*EventMetric),
timer: metrics.NewTimer(),
receivedPong: true,
}
em.QuitService = *NewQuitService(nil, "EventMeter", em)
return em
}
func (em *EventMeter) OnStart() error {
em.QuitService.OnStart()
if err := em.wsc.OnStart(); err != nil {
return err
}
em.wsc.Conn.SetPongHandler(func(m string) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
em.mtx.Lock()
defer em.mtx.Unlock()
em.receivedPong = true
em.timer.UpdateSince(em.lastPing)
return nil
})
go em.receiveRoutine()
return nil
}
func (em *EventMeter) OnStop() {
em.wsc.OnStop()
em.QuitService.OnStop()
}
func (em *EventMeter) Subscribe(eventid string) error {
em.mtx.Lock()
defer em.mtx.Unlock()
if _, ok := em.events[eventid]; ok {
return fmt.Errorf("Subscription already exists")
}
if err := em.wsc.Subscribe(eventid); err != nil {
return err
}
em.events[eventid] = &EventMetric{
Started: time.Now(),
MinDuration: 1 << 62,
meter: metrics.NewMeter(),
}
return nil
}
func (em *EventMeter) Unsubscribe(eventid string) error {
em.mtx.Lock()
defer em.mtx.Unlock()
if err := em.wsc.Unsubscribe(eventid); err != nil {
return err
}
// XXX: should we persist or save this info first?
delete(em.events, eventid)
return nil
}
//------------------------------------------------------
func (em *EventMeter) receiveRoutine() {
logTicker := time.NewTicker(time.Second * 3)
pingTicker := time.NewTicker(time.Second * 1)
for {
select {
case <-logTicker.C:
em.mtx.Lock()
for _, metric := range em.events {
metric.Count = metric.meter.Count()
metric.Rate1 = metric.meter.Rate1()
metric.Rate5 = metric.meter.Rate5()
metric.Rate15 = metric.meter.Rate15()
metric.RateMean = metric.meter.RateMean()
metric.Latency = em.timer.Mean()
b, err := json.Marshal(metric)
if err != nil {
// TODO
log.Error(err.Error())
continue
}
var out bytes.Buffer
json.Indent(&out, b, "", "\t")
out.WriteTo(os.Stdout)
}
em.mtx.Unlock()
case <-pingTicker.C:
em.mtx.Lock()
// ping to record latency
if !em.receivedPong {
// XXX: why is the pong taking so long? should we stop the conn?
em.mtx.Unlock()
continue
}
em.lastPing = time.Now()
em.receivedPong = false
err := em.wsc.Conn.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
log.Error("Failed to write ping message on websocket", "error", err)
em.wsc.Stop()
return
}
em.mtx.Unlock()
case r := <-em.wsc.ResultsCh:
em.mtx.Lock()
switch r := r.(type) {
case *ctypes.ResultEvent:
id, _ := r.Event, r.Data
metric, ok := em.events[id]
if !ok {
// we already unsubscribed, or got an unexpected event
continue
}
last := metric.LastHeard
metric.LastHeard = time.Now()
metric.meter.Mark(1)
dur := int64(metric.LastHeard.Sub(last))
if dur < metric.MinDuration {
metric.MinDuration = dur
}
if !last.IsZero() && dur > metric.MaxDuration {
metric.MaxDuration = dur
}
default:
log.Error("Unknown result event type", "type", reflect.TypeOf(r))
}
em.mtx.Unlock()
case <-em.Quit:
break
}
}
}

+ 152
- 0
handlers/handlers.go View File

@ -0,0 +1,152 @@
package handlers
import (
"fmt"
"sort"
"sync"
"github.com/tendermint/go-event-meter"
"github.com/tendermint/go-wire"
"github.com/tendermint/netmon/types"
)
type NetMonResultInterface interface{}
type NetMonResult struct {
Result NetMonResultInterface
}
// for wire.readReflect
var _ = wire.RegisterInterface(
struct{ NetMonResultInterface }{},
wire.ConcreteType{&types.ChainAndValidatorIDs{}, 0x01},
wire.ConcreteType{&types.ChainStatus{}, 0x02},
wire.ConcreteType{&types.Validator{}, 0x03},
wire.ConcreteType{&eventmeter.EventMetric{}, 0x04},
)
//---------------------------------------------
// global state and backend functions
type TendermintNetwork struct {
mtx sync.Mutex
Chains map[string]*types.ChainStatus `json:"blockchains"`
ValSets map[string]*types.ValidatorSet `json:"validator_sets"`
}
// TODO: populate validator sets
func NewTendermintNetwork(chains ...*types.ChainStatus) *TendermintNetwork {
network := &TendermintNetwork{
Chains: make(map[string]*types.ChainStatus),
ValSets: make(map[string]*types.ValidatorSet),
}
for _, chain := range chains {
network.Chains[chain.Config.ID] = chain
}
return network
}
//------------
// RPC funcs
func (tn *TendermintNetwork) Status() (*types.ChainAndValidatorIDs, error) {
tn.mtx.Lock()
defer tn.mtx.Unlock()
chains := make([]string, len(tn.Chains))
valSets := make([]string, len(tn.ValSets))
i := 0
for chain, _ := range tn.Chains {
chains[i] = chain
i += 1
}
i = 0
for valset, _ := range tn.ValSets {
valSets[i] = valset
i += 1
}
sort.StringSlice(chains).Sort()
sort.StringSlice(valSets).Sort()
return &types.ChainAndValidatorIDs{
ChainIDs: chains,
ValidatorSetIDs: valSets,
}, nil
}
func (tn *TendermintNetwork) GetChain(chainID string) (*types.ChainStatus, error) {
tn.mtx.Lock()
defer tn.mtx.Unlock()
chain, ok := tn.Chains[chainID]
if !ok {
return nil, fmt.Errorf("Unknown chain %s", chainID)
}
return chain, nil
}
func (tn *TendermintNetwork) GetValidatorSet(valSetID string) (*types.ValidatorSet, error) {
tn.mtx.Lock()
defer tn.mtx.Unlock()
valSet, ok := tn.ValSets[valSetID]
if !ok {
return nil, fmt.Errorf("Unknown validator set %s", valSetID)
}
return valSet, nil
}
func (tn *TendermintNetwork) GetValidator(valSetID, valID string) (*types.Validator, error) {
tn.mtx.Lock()
defer tn.mtx.Unlock()
valSet, ok := tn.ValSets[valSetID]
if !ok {
return nil, fmt.Errorf("Unknown validator set %s", valSetID)
}
val, err := valSet.Validator(valID)
if err != nil {
return nil, err
}
return val, nil
}
func (tn *TendermintNetwork) StartMeter(chainID, valID, eventID string) error {
tn.mtx.Lock()
defer tn.mtx.Unlock()
val, err := tn.getChainVal(chainID, valID)
if err != nil {
return err
}
return val.EventMeter().Subscribe(eventID, nil)
}
func (tn *TendermintNetwork) StopMeter(chainID, valID, eventID string) error {
tn.mtx.Lock()
defer tn.mtx.Unlock()
val, err := tn.getChainVal(chainID, valID)
if err != nil {
return err
}
return val.EventMeter().Unsubscribe(eventID)
}
func (tn *TendermintNetwork) GetMeter(chainID, valID, eventID string) (*eventmeter.EventMetric, error) {
tn.mtx.Lock()
defer tn.mtx.Unlock()
val, err := tn.getChainVal(chainID, valID)
if err != nil {
return nil, err
}
return val.EventMeter().GetMetric(eventID)
}
func (tn *TendermintNetwork) getChainVal(chainID, valID string) (*types.ChainValidator, error) {
chain, ok := tn.Chains[chainID]
if !ok {
return nil, fmt.Errorf("Unknown chain %s", chainID)
}
val, err := chain.Config.GetValidatorByID(valID)
if err != nil {
return nil, err
}
return val, nil
}

+ 77
- 0
handlers/routes.go View File

@ -0,0 +1,77 @@
package handlers
import (
rpc "github.com/tendermint/go-rpc/server"
)
func Routes(network *TendermintNetwork) map[string]*rpc.RPCFunc {
return map[string]*rpc.RPCFunc{
// subscribe/unsubscribe are reserved for websocket events.
// "subscribe": rpc.NewWSRPCFunc(Subscribe, []string{"event"}),
// "unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, []string{"event"}),
"status": rpc.NewRPCFunc(StatusResult(network), ""),
"blockchain": rpc.NewRPCFunc(GetChainResult(network), "chain"),
"validator_set": rpc.NewRPCFunc(GetValidatorSetResult(network), "valsetID"),
"validator": rpc.NewRPCFunc(GetValidatorResult(network), "valSetID,valID"),
"start_meter": rpc.NewRPCFunc(network.StartMeter, "chainID,valID,event"),
"stop_meter": rpc.NewRPCFunc(network.StopMeter, "chainID,valID,event"),
"meter": rpc.NewRPCFunc(GetMeterResult(network), "chainID,valID,event"),
}
}
func StatusResult(network *TendermintNetwork) interface{} {
return func() (*NetMonResult, error) {
r, err := network.Status()
if err != nil {
return nil, err
} else {
return &NetMonResult{r}, nil
}
}
}
func GetChainResult(network *TendermintNetwork) interface{} {
return func(chain string) (*NetMonResult, error) {
r, err := network.GetChain(chain)
if err != nil {
return nil, err
} else {
return &NetMonResult{r}, nil
}
}
}
func GetValidatorSetResult(network *TendermintNetwork) interface{} {
return func(valSetID string) (*NetMonResult, error) {
r, err := network.GetValidatorSet(valSetID)
if err != nil {
return nil, err
} else {
return &NetMonResult{r}, nil
}
}
}
func GetValidatorResult(network *TendermintNetwork) interface{} {
return func(valSetID, valID string) (*NetMonResult, error) {
r, err := network.GetValidator(valSetID, valID)
if err != nil {
return nil, err
} else {
return &NetMonResult{r}, nil
}
}
}
func GetMeterResult(network *TendermintNetwork) interface{} {
return func(chainID, valID, eventID string) (*NetMonResult, error) {
r, err := network.GetMeter(chainID, valID, eventID)
if err != nil {
return nil, err
} else {
return &NetMonResult{r}, nil
}
}
}

+ 170
- 5
main.go View File

@ -1,10 +1,30 @@
package main package main
import ( import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"strings"
"github.com/tendermint/go-event-meter"
"github.com/tendermint/netmon/handlers"
"github.com/tendermint/netmon/types"
"github.com/codegangsta/cli"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-events"
pcm "github.com/tendermint/go-process"
"github.com/tendermint/go-rpc/server"
"github.com/tendermint/go-rpc/types"
"github.com/tendermint/go-wire"
tmcfg "github.com/tendermint/tendermint/config/tendermint" tmcfg "github.com/tendermint/tendermint/config/tendermint"
"github.com/tendermint/tendermint/types"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
) )
func init() { func init() {
@ -15,15 +35,160 @@ func init() {
} }
func main() { func main() {
em := NewEventMeter("ws://localhost:46657/websocket")
if _, err := em.Start(); err != nil {
app := cli.NewApp()
app.Name = "netmon"
app.Usage = "netmon [command] [args...]"
app.Commands = []cli.Command{
{
Name: "config",
Usage: "Create a config from a mintnet testnet",
ArgsUsage: "[chainID] [prefix] [N]",
Action: func(c *cli.Context) {
cmdConfig(c)
},
},
{
Name: "monitor",
Usage: "Monitor a chain",
ArgsUsage: "[config file]",
Action: func(c *cli.Context) {
cmdMonitor(c)
},
},
}
app.Run(os.Args)
}
func cmdMonitor(c *cli.Context) {
args := c.Args()
if len(args) != 1 {
Exit("monitor expectes 1 arg")
}
configFile := args[0]
b, err := ioutil.ReadFile(configFile)
if err != nil {
Exit(err.Error())
}
// for now we start with one blockchain;
// eventually more can be uploaded or created through endpoints
chainConfig := new(types.BlockchainConfig)
if err := json.Unmarshal(b, chainConfig); err != nil {
Exit(err.Error()) Exit(err.Error())
} }
if err := em.Subscribe(types.EventStringNewBlock()); err != nil {
chainStatus := &types.ChainStatus{Config: chainConfig}
// start the event meter and listen for new blocks on each validator
for _, v := range chainConfig.Validators {
if err := v.NewEventMeter(); err != nil {
Exit(err.Error())
}
if err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), func(metric *eventmeter.EventMetric, data interface{}) {
// TODO: update chain status with block and metric
// chainStatus.NewBlock(data.(tmtypes.EventDataNewBlock).Block)
}); err != nil {
Exit(err.Error())
}
}
// the main object that watches for changes and serves the rpc requests
network := handlers.NewTendermintNetwork(chainStatus)
// the routes are functions on the network object
routes := handlers.Routes(network)
// register the result objects with wire
wire.RegisterInterface(
struct{ rpctypes.Result }{},
wire.ConcreteType{&events.EventResult{}, 0x1},
wire.ConcreteType{&ctypes.TendermintResult{}, 0x2},
wire.ConcreteType{&handlers.NetMonResult{}, 0x3},
)
// serve http and ws
mux := http.NewServeMux()
wm := rpcserver.NewWebsocketManager(routes, nil) // TODO: evsw
mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, routes)
if _, err := rpcserver.StartHTTPServer("0.0.0.0:46670", mux); err != nil {
Exit(err.Error()) Exit(err.Error())
} }
TrapSignal(func() { TrapSignal(func() {
em.Stop()
// TODO: clean shutdown server, maybe persist last state
for _, v := range chainConfig.Validators {
v.EventMeter().Stop()
}
}) })
} }
func cmdConfig(c *cli.Context) {
args := c.Args()
if len(args) != 3 {
Exit("config expects 3 args")
}
id, prefix := args[0], args[1]
n, err := strconv.Atoi(args[2])
if err != nil {
Exit(err.Error())
}
chain, err := ConfigFromMachines(id, prefix, n)
if err != nil {
Exit(err.Error())
}
b, err := json.Marshal(chain)
if err != nil {
Exit(err.Error())
}
fmt.Println(string(b))
}
func ConfigFromMachines(chainID, prefix string, N int) (*types.BlockchainConfig, error) {
chain := &types.BlockchainConfig{
ID: chainID,
Validators: make([]*types.ChainValidator, N),
}
for i := 0; i < N; i++ {
id := fmt.Sprintf("%s%d", prefix, i+1)
ip, success := runProcessGetResult(id+"-ip", "docker-machine", []string{"ip", id})
if !success {
return nil, fmt.Errorf(ip)
}
val := &types.Validator{
ID: id,
// TODO: pubkey
}
chainVal := &types.ChainValidator{
Validator: val,
Addr: fmt.Sprintf("%s:%d", strings.Trim(ip, "\n"), 46657),
Index: i,
}
chain.Validators[i] = chainVal
}
return chain, nil
}
func runProcessGetResult(label string, command string, args []string) (string, bool) {
outFile := NewBufferCloser(nil)
fmt.Println(Green(command), Green(args))
proc, err := pcm.StartProcess(label, command, args, nil, outFile)
if err != nil {
return "", false
}
<-proc.WaitCh
if proc.ExitState.Success() {
fmt.Println(Blue(string(outFile.Bytes())))
return string(outFile.Bytes()), true
} else {
// Error!
fmt.Println(Red(string(outFile.Bytes())))
return string(outFile.Bytes()), false
}
}

+ 119
- 0
types/types.go View File

@ -0,0 +1,119 @@
package types
import (
"fmt"
"sync"
"github.com/tendermint/go-event-meter"
"github.com/tendermint/go-crypto"
)
//---------------------------------------------
// core types
// Known chain and validator set IDs (from which anything else can be found)
type ChainAndValidatorIDs struct {
ChainIDs []string `json:"chain_ids"`
ValidatorSetIDs []string `json:"validator_set_ids"`
}
// state of a chain
type ChainStatus struct {
Config *BlockchainConfig `json:"config"`
Status *BlockchainStatus `json:"status"`
}
// basic chain config
// threadsafe
type BlockchainConfig struct {
mtx sync.Mutex
ID string `json:"id"`
ValSetID string `json:"val_set_id"`
Validators []*ChainValidator `json:"validators"`
valIDMap map[string]int // map IDs to indices
}
func (bc *BlockchainConfig) PopulateValIDMap() {
bc.mtx.Lock()
defer bc.mtx.Unlock()
bc.valIDMap = make(map[string]int)
for i, v := range bc.Validators {
bc.valIDMap[v.ID] = i
}
}
func (bc *BlockchainConfig) GetValidatorByID(valID string) (*ChainValidator, error) {
bc.mtx.Lock()
defer bc.mtx.Unlock()
valIndex, ok := bc.valIDMap[valID]
if !ok {
return nil, fmt.Errorf("Unknown validator %s", valID)
}
return bc.Validators[valIndex], nil
}
// basic chain status/metrics
// threadsafe
type BlockchainStatus struct {
mtx sync.Mutex
Height int `json:"height"`
MeanBlockTime float64 `json:"mean_block_time"`
TxThroughput float64 `json:"tx_throughput"`
BlockchainSize int64 `json:"blockchain_size"` // how might we get StateSize ?
}
// validator on a chain
type ChainValidator struct {
*Validator
Addr string `json:"addr"` // do we want multiple addrs?
Index int `json:"index"`
em *eventmeter.EventMeter // holds a ws connection to the val
Latency float64 `json:"latency,omitempty"`
}
func (cv *ChainValidator) NewEventMeter() error {
em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", cv.Addr))
if err := em.Start(); err != nil {
return err
}
cv.em = em
return nil
}
func (cv *ChainValidator) EventMeter() *eventmeter.EventMeter {
return cv.em
}
// validator set (independent of chains)
type ValidatorSet struct {
Validators []*Validator `json:"validators"`
}
func (vs *ValidatorSet) Validator(valID string) (*Validator, error) {
for _, v := range vs.Validators {
if v.ID == valID {
return v, nil
}
}
return nil, fmt.Errorf("Unknwon validator %s", valID)
}
// validator (independent of chain)
type Validator struct {
ID string `json:"id"`
PubKey crypto.PubKey `json:"pub_key,omitempty"`
Chains []*ChainStatus `json:"chains,omitempty"`
}
func (v *Validator) Chain(chainID string) (*ChainStatus, error) {
for _, c := range v.Chains {
if c.Config.ID == chainID {
return c, nil
}
}
return nil, fmt.Errorf("Unknwon chain %s", chainID)
}

Loading…
Cancel
Save