From 1b81863ef732c62b5647bc7fe9e1be0e63bdef0e Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 20 Apr 2016 14:52:11 -0400 Subject: [PATCH] NewBlockHeader event --- handlers/callbacks.go | 4 ++-- types/chain.go | 10 +++++----- types/val.go | 28 +++++++++++++++++++++++++--- 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/handlers/callbacks.go b/handlers/callbacks.go index 0b1f94829..027769db6 100644 --- a/handlers/callbacks.go +++ b/handlers/callbacks.go @@ -21,14 +21,14 @@ import ( func (tn *TendermintNetwork) registerCallbacks(chainState *types.ChainState, v *types.ValidatorState) error { v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainState, v)) v.EventMeter().RegisterDisconnectCallback(tn.disconnectCallback(chainState, v)) - return v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainState, v)) + return v.EventMeter().Subscribe(tmtypes.EventStringNewBlockHeader(), tn.newBlockCallback(chainState, v)) } // implements eventmeter.EventCallbackFunc // updates validator and possibly chain with new block func (tn *TendermintNetwork) newBlockCallback(chainState *types.ChainState, val *types.ValidatorState) eventmeter.EventCallbackFunc { return func(metric *eventmeter.EventMetric, data events.EventData) { - block := data.(tmtypes.EventDataNewBlock).Block + block := data.(tmtypes.EventDataNewBlockHeader).Header // these functions are thread safe // we should run them concurrently diff --git a/types/chain.go b/types/chain.go index 3d5a4ac7b..f5c5344a2 100644 --- a/types/chain.go +++ b/types/chain.go @@ -37,7 +37,7 @@ type ChainState struct { Status *BlockchainStatus `json:"status"` } -func (cs *ChainState) NewBlock(block *tmtypes.Block) { +func (cs *ChainState) NewBlock(block *tmtypes.Header) { cs.Status.NewBlock(block) } @@ -225,12 +225,12 @@ func NewBlockchainStatus() *BlockchainStatus { } } -func (s *BlockchainStatus) NewBlock(block *tmtypes.Block) { +func (s *BlockchainStatus) NewBlock(block *tmtypes.Header) { s.mtx.Lock() defer s.mtx.Unlock() - if block.Header.Height > s.Height { - numTxs := block.Header.NumTxs - s.Height = block.Header.Height + if block.Height > s.Height { + numTxs := block.NumTxs + s.Height = block.Height s.blockTimeMeter.Mark(1) s.txThroughputMeter.Mark(int64(numTxs)) s.MeanBlockTime = (1.0 / s.blockTimeMeter.Rate1()) * 1000 // 1/s to ms diff --git a/types/val.go b/types/val.go index c6441b321..fb001eaef 100644 --- a/types/val.go +++ b/types/val.go @@ -1,12 +1,15 @@ package types import ( + "encoding/json" "fmt" "sync" "github.com/tendermint/go-crypto" "github.com/tendermint/go-event-meter" + "github.com/tendermint/go-events" client "github.com/tendermint/go-rpc/client" + "github.com/tendermint/go-wire" ctypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" ) @@ -65,7 +68,7 @@ func (vs *ValidatorState) Start() error { rpcAddr := vs.Config.RPCAddr vs.Config.mtx.Unlock() - em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", rpcAddr), ctypes.UnmarshalEvent) + em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", rpcAddr), UnmarshalEvent) if _, err := em.Start(); err != nil { return err } @@ -82,10 +85,10 @@ func (vs *ValidatorState) EventMeter() *eventmeter.EventMeter { return vs.em } -func (vs *ValidatorState) NewBlock(block *tmtypes.Block) { +func (vs *ValidatorState) NewBlock(block *tmtypes.Header) { vs.Status.mtx.Lock() defer vs.Status.mtx.Unlock() - vs.Status.BlockHeight = block.Header.Height + vs.Status.BlockHeight = block.Height } func (vs *ValidatorState) UpdateLatency(latency float64) float64 { @@ -143,3 +146,22 @@ type ValidatorStatus struct { Latency float64 `json:"latency" wire:"unsafe"` BlockHeight int `json:"block_height"` } + +//------------------------------------------------------------ +// utility + +// Unmarshal a json event +func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) { + var err error + result := new(ctypes.TMResult) + wire.ReadJSONPtr(result, b, &err) + if err != nil { + return "", nil, err + } + event, ok := (*result).(*ctypes.ResultEvent) + if !ok { + return "", nil, nil // TODO: handle non-event messages (ie. return from subscribe/unsubscribe) + // fmt.Errorf("Result is not type *ctypes.ResultEvent. Got %v", reflect.TypeOf(*result)) + } + return event.Name, event.Data, nil +}