From 1af021846e88c80d971c44f86a7805f4500aeab6 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 15 Jan 2016 23:31:57 -0500 Subject: [PATCH] fill in metrics --- handlers/callbacks.go | 53 ++++++++++++++++++++++++++++++ handlers/handlers.go | 52 +++++++++++++++++++++++------ handlers/routes.go | 16 ++++++--- main.go | 5 ++- types/chain.go | 76 ++++++++++++++++++++++++++++--------------- types/types.go | 45 ------------------------- types/val.go | 30 +++++++++++++++-- 7 files changed, 185 insertions(+), 92 deletions(-) create mode 100644 handlers/callbacks.go delete mode 100644 types/types.go diff --git a/handlers/callbacks.go b/handlers/callbacks.go new file mode 100644 index 000000000..3cba0f2c6 --- /dev/null +++ b/handlers/callbacks.go @@ -0,0 +1,53 @@ +package handlers + +import ( + "github.com/tendermint/go-event-meter" + "github.com/tendermint/go-events" + + tmtypes "github.com/tendermint/tendermint/types" +) + +// implements eventmeter.EventCallbackFunc +func (tn *TendermintNetwork) newBlockCallback(chainID, valID string) eventmeter.EventCallbackFunc { + return func(metric *eventmeter.EventMetric, data events.EventData) { + block := data.(tmtypes.EventDataNewBlock).Block + + tn.mtx.Lock() + defer tn.mtx.Unlock() + + // grab chain and validator + chain := tn.Chains[chainID] + val, _ := chain.Config.GetValidatorByID(valID) + + // update height for validator + val.BlockHeight = block.Header.Height + + // possibly update height and mean block time for chain + if block.Header.Height > chain.Status.Height { + chain.Status.NewBlock(block) + } + + } +} + +// implements eventmeter.EventLatencyFunc +func (tn *TendermintNetwork) latencyCallback(chainID, valID string) eventmeter.LatencyCallbackFunc { + return func(latency float64) { + tn.mtx.Lock() + defer tn.mtx.Unlock() + + // grab chain and validator + chain := tn.Chains[chainID] + val, _ := chain.Config.GetValidatorByID(valID) + + // update latency for this validator and avg latency for chain + mean := chain.Status.MeanLatency * float64(chain.Status.NumValidators) + mean = (mean - val.Latency + latency) / float64(chain.Status.NumValidators) + val.Latency = latency + chain.Status.MeanLatency = mean + + // TODO: possibly update active nodes and uptime for chain + chain.Status.ActiveValidators = chain.Status.NumValidators // XXX + + } +} diff --git a/handlers/handlers.go b/handlers/handlers.go index c8a18e537..6955b06a0 100644 --- a/handlers/handlers.go +++ b/handlers/handlers.go @@ -9,6 +9,7 @@ import ( "github.com/tendermint/go-wire" "github.com/tendermint/netmon/types" + tmtypes "github.com/tendermint/tendermint/types" ) type NetMonResult interface { @@ -17,7 +18,7 @@ type NetMonResult interface { // for wire.readReflect var _ = wire.RegisterInterface( struct{ NetMonResult }{}, - wire.ConcreteType{&types.ChainAndValidatorIDs{}, 0x01}, + wire.ConcreteType{&types.ChainAndValidatorSetIDs{}, 0x01}, wire.ConcreteType{&types.ChainState{}, 0x02}, wire.ConcreteType{&types.Validator{}, 0x03}, wire.ConcreteType{&eventmeter.EventMetric{}, 0x04}, @@ -47,20 +48,21 @@ func NewTendermintNetwork(chains ...*types.ChainState) *TendermintNetwork { //------------ // Public Methods -func (tn *TendermintNetwork) RegisterChain(chain *types.ChainState) { +func (tn *TendermintNetwork) Stop() { tn.mtx.Lock() defer tn.mtx.Unlock() - tn.Chains[chain.Config.ID] = chain -} - -func (tn *TendermintNetwork) Stop() { - // TODO: for each chain, stop each validator + for _, c := range tn.Chains { + for _, v := range c.Config.Validators { + v.Stop() + } + } } //------------ // RPC funcs -func (tn *TendermintNetwork) Status() (*types.ChainAndValidatorIDs, error) { +// Returns sorted lists of all chains and validator sets +func (tn *TendermintNetwork) Status() (*types.ChainAndValidatorSetIDs, error) { tn.mtx.Lock() defer tn.mtx.Unlock() chains := make([]string, len(tn.Chains)) @@ -77,7 +79,7 @@ func (tn *TendermintNetwork) Status() (*types.ChainAndValidatorIDs, error) { } sort.StringSlice(chains).Sort() sort.StringSlice(valSets).Sort() - return &types.ChainAndValidatorIDs{ + return &types.ChainAndValidatorSetIDs{ ChainIDs: chains, ValidatorSetIDs: valSets, }, nil @@ -91,10 +93,40 @@ func (tn *TendermintNetwork) GetChain(chainID string) (*types.ChainState, error) if !ok { return nil, fmt.Errorf("Unknown chain %s", chainID) } - fmt.Println("CHAIN:", chain) return chain, nil } +func (tn *TendermintNetwork) RegisterChain(chainConfig *types.BlockchainConfig) (*types.ChainState, error) { + tn.mtx.Lock() + defer tn.mtx.Unlock() + + chainState := &types.ChainState{ + Config: chainConfig, + Status: types.NewBlockchainStatus(), + } + chainState.Status.NumValidators = len(chainConfig.Validators) + + chainState.Config.PopulateValIDMap() + + // start the event meter and listen for new blocks on each validator + for _, v := range chainConfig.Validators { + + if err := v.Start(); err != nil { + return nil, err + } + v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainConfig.ID, v.Validator.ID)) + err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainConfig.ID, v.Validator.ID)) + if err != nil { + return nil, err + } + + // get/set the validator's pub key + v.PubKey() + } + tn.Chains[chainState.Config.ID] = chainState + return chainState, nil +} + func (tn *TendermintNetwork) GetValidatorSet(valSetID string) (*types.ValidatorSet, error) { tn.mtx.Lock() defer tn.mtx.Unlock() diff --git a/handlers/routes.go b/handlers/routes.go index b600ad595..67f879036 100644 --- a/handlers/routes.go +++ b/handlers/routes.go @@ -2,6 +2,7 @@ package handlers import ( rpc "github.com/tendermint/go-rpc/server" + "github.com/tendermint/netmon/types" ) func Routes(network *TendermintNetwork) map[string]*rpc.RPCFunc { @@ -10,10 +11,11 @@ func Routes(network *TendermintNetwork) map[string]*rpc.RPCFunc { // "subscribe": rpc.NewWSRPCFunc(Subscribe, []string{"event"}), // "unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, []string{"event"}), - "status": rpc.NewRPCFunc(StatusResult(network), ""), - "blockchain": rpc.NewRPCFunc(GetChainResult(network), "chain"), - "validator_set": rpc.NewRPCFunc(GetValidatorSetResult(network), "valsetID"), - "validator": rpc.NewRPCFunc(GetValidatorResult(network), "valSetID,valID"), + "status": rpc.NewRPCFunc(StatusResult(network), ""), + "blockchain": rpc.NewRPCFunc(GetChainResult(network), "chain"), + "register_chain": rpc.NewRPCFunc(RegisterChainResult(network), "chainConfig"), + "validator_set": rpc.NewRPCFunc(GetValidatorSetResult(network), "valsetID"), + "validator": rpc.NewRPCFunc(GetValidatorResult(network), "valSetID,valID"), "start_meter": rpc.NewRPCFunc(network.StartMeter, "chainID,valID,event"), "stop_meter": rpc.NewRPCFunc(network.StopMeter, "chainID,valID,event"), @@ -33,6 +35,12 @@ func GetChainResult(network *TendermintNetwork) interface{} { } } +func RegisterChainResult(network *TendermintNetwork) interface{} { + return func(chainConfig *types.BlockchainConfig) (NetMonResult, error) { + return network.RegisterChain(chainConfig) + } +} + func GetValidatorSetResult(network *TendermintNetwork) interface{} { return func(valSetID string) (NetMonResult, error) { return network.GetValidatorSet(valSetID) diff --git a/main.go b/main.go index d27a1dbf5..cd7707e6b 100644 --- a/main.go +++ b/main.go @@ -58,14 +58,14 @@ func cmdMonitor(c *cli.Context) { } chainConfigFile := args[0] - chainState, err := types.LoadChainFromFile(chainConfigFile) + chainConfig, err := types.LoadChainFromFile(chainConfigFile) if err != nil { Exit(err.Error()) } // the main object that watches for changes and serves the rpc requests network := handlers.NewTendermintNetwork() - network.RegisterChain(chainState) + network.RegisterChain(chainConfig) // the routes are functions on the network object routes := handlers.Routes(network) @@ -80,7 +80,6 @@ func cmdMonitor(c *cli.Context) { } TrapSignal(func() { - // TODO: clean shutdown server, maybe persist last state network.Stop() }) diff --git a/types/chain.go b/types/chain.go index 8ca43f842..2784b8de6 100644 --- a/types/chain.go +++ b/types/chain.go @@ -6,15 +6,56 @@ import ( "io/ioutil" "sync" - "github.com/tendermint/go-event-meter" - "github.com/tendermint/go-events" + "github.com/rcrowley/go-metrics" tmtypes "github.com/tendermint/tendermint/types" ) //------------------------------------------------ // blockchain types -// State of a chain +// Known chain and validator set IDs (from which anything else can be found) +type ChainAndValidatorSetIDs struct { + ChainIDs []string `json:"chain_ids"` + ValidatorSetIDs []string `json:"validator_set_ids"` +} + +// Basic chain and network metrics +type BlockchainStatus struct { + // Blockchain Info + Height int `json:"height"` + BlockchainSize int64 `json:"blockchain_size"` // how might we get StateSize ? + MeanBlockTime float64 `json:"mean_block_time" wire:"unsafe"` + TxThroughput float64 `json:"tx_throughput" wire:"unsafe"` + + blockTimeMeter metrics.Meter + txThroughputMeter metrics.Meter + + // Network Info + NumValidators int `json:"num_validators"` + ActiveValidators int `json:"active_validators"` + ActiveNodes int `json:"active_nodes"` + MeanLatency float64 `json:"mean_latency" wire:"unsafe"` + Uptime float64 `json:"uptime" wire:"unsafe"` + + // TODO: charts for block time, latency (websockets/event-meter ?) +} + +func NewBlockchainStatus() *BlockchainStatus { + return &BlockchainStatus{ + blockTimeMeter: metrics.NewMeter(), + txThroughputMeter: metrics.NewMeter(), + } +} + +func (s *BlockchainStatus) NewBlock(block *tmtypes.Block) { + s.Height = block.Header.Height + s.blockTimeMeter.Mark(1) + s.txThroughputMeter.Mark(int64(block.Header.NumTxs)) + s.MeanBlockTime = 1 / s.blockTimeMeter.RateMean() + s.TxThroughput = s.txThroughputMeter.RateMean() +} + +// Main chain state // Returned over RPC but also used to manage state type ChainState struct { Config *BlockchainConfig `json:"config"` @@ -24,10 +65,10 @@ type ChainState struct { // basic chain config // threadsafe type BlockchainConfig struct { - mtx sync.Mutex + ID string `json:"id"` + ValSetID string `json:"val_set_id"` - ID string `json:"id"` - ValSetID string `json:"val_set_id"` + mtx sync.Mutex Validators []*ChainValidator `json:"validators"` valIDMap map[string]int // map IDs to indices } @@ -52,7 +93,7 @@ func (bc *BlockchainConfig) GetValidatorByID(valID string) (*ChainValidator, err return bc.Validators[valIndex], nil } -func LoadChainFromFile(configFile string) (*ChainState, error) { +func LoadChainFromFile(configFile string) (*BlockchainConfig, error) { b, err := ioutil.ReadFile(configFile) if err != nil { @@ -66,24 +107,5 @@ func LoadChainFromFile(configFile string) (*ChainState, error) { return nil, err } - chainState := &ChainState{Config: chainConfig} - - // start the event meter and listen for new blocks on each validator - for _, v := range chainConfig.Validators { - - if err := v.Start(); err != nil { - return nil, err - } - err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), func(metric *eventmeter.EventMetric, data events.EventData) { - // TODO: update chain status with block and metric - // chainState.NewBlock(data.(tmtypes.EventDataNewBlock).Block) - }) - if err != nil { - return nil, err - } - - // get/set the validator's pub key - v.PubKey() - } - return chainState, nil + return chainConfig, nil } diff --git a/types/types.go b/types/types.go deleted file mode 100644 index c4705a79a..000000000 --- a/types/types.go +++ /dev/null @@ -1,45 +0,0 @@ -package types - -import ( - "fmt" - "github.com/tendermint/go-crypto" -) - -//--------------------------------------------- -// simple types - -// Known chain and validator set IDs (from which anything else can be found) -type ChainAndValidatorIDs struct { - ChainIDs []string `json:"chain_ids"` - ValidatorSetIDs []string `json:"validator_set_ids"` -} - -// basic chain status/metrics -type BlockchainStatus struct { - Height int `json:"height"` - MeanBlockTime float64 `json:"mean_block_time" wire:"unsafe"` - TxThroughput float64 `json:"tx_throughput" wire:"unsafe"` - - BlockchainSize int64 `json:"blockchain_size"` // how might we get StateSize ? -} - -// validator set (independent of chains) -type ValidatorSet struct { - Validators []*Validator `json:"validators"` -} - -func (vs *ValidatorSet) Validator(valID string) (*Validator, error) { - for _, v := range vs.Validators { - if v.ID == valID { - return v, nil - } - } - return nil, fmt.Errorf("Unknwon validator %s", valID) -} - -// validator (independent of chain) -type Validator struct { - ID string `json:"id"` - PubKey crypto.PubKey `json:"pub_key"` - Chains []string `json:"chains"` -} diff --git a/types/val.go b/types/val.go index 29932b642..b6972ad63 100644 --- a/types/val.go +++ b/types/val.go @@ -16,6 +16,27 @@ import ( //------------------------------------------------ // validator types +// validator set (independent of chains) +type ValidatorSet struct { + Validators []*Validator `json:"validators"` +} + +func (vs *ValidatorSet) Validator(valID string) (*Validator, error) { + for _, v := range vs.Validators { + if v.ID == valID { + return v, nil + } + } + return nil, fmt.Errorf("Unknwon validator %s", valID) +} + +// validator (independent of chain) +type Validator struct { + ID string `json:"id"` + PubKey crypto.PubKey `json:"pub_key"` + Chains []string `json:"chains"` +} + // Validator on a chain // Responsible for communication with the validator // Returned over RPC but also used to manage state @@ -24,9 +45,12 @@ type ChainValidator struct { Addr string `json:"addr"` // do we want multiple addrs? Index int `json:"index"` - em *eventmeter.EventMeter // holds a ws connection to the val - client *client.ClientURI // rpc client - Latency float64 `json:"latency" wire:"unsafe"` + Latency float64 `json:"latency" wire:"unsafe"` + BlockHeight int `json:"block_height"` + + em *eventmeter.EventMeter // holds a ws connection to the val + client *client.ClientURI // rpc client + } // Start a new event meter, including the websocket connection