Browse Source

NewBlockHeader event

pull/1943/head
Ethan Buchman 9 years ago
parent
commit
1b81863ef7
3 changed files with 32 additions and 10 deletions
  1. +2
    -2
      handlers/callbacks.go
  2. +5
    -5
      types/chain.go
  3. +25
    -3
      types/val.go

+ 2
- 2
handlers/callbacks.go View File

@ -21,14 +21,14 @@ import (
func (tn *TendermintNetwork) registerCallbacks(chainState *types.ChainState, v *types.ValidatorState) error {
v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainState, v))
v.EventMeter().RegisterDisconnectCallback(tn.disconnectCallback(chainState, v))
return v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainState, v))
return v.EventMeter().Subscribe(tmtypes.EventStringNewBlockHeader(), tn.newBlockCallback(chainState, v))
}
// implements eventmeter.EventCallbackFunc
// updates validator and possibly chain with new block
func (tn *TendermintNetwork) newBlockCallback(chainState *types.ChainState, val *types.ValidatorState) eventmeter.EventCallbackFunc {
return func(metric *eventmeter.EventMetric, data events.EventData) {
block := data.(tmtypes.EventDataNewBlock).Block
block := data.(tmtypes.EventDataNewBlockHeader).Header
// these functions are thread safe
// we should run them concurrently


+ 5
- 5
types/chain.go View File

@ -37,7 +37,7 @@ type ChainState struct {
Status *BlockchainStatus `json:"status"`
}
func (cs *ChainState) NewBlock(block *tmtypes.Block) {
func (cs *ChainState) NewBlock(block *tmtypes.Header) {
cs.Status.NewBlock(block)
}
@ -225,12 +225,12 @@ func NewBlockchainStatus() *BlockchainStatus {
}
}
func (s *BlockchainStatus) NewBlock(block *tmtypes.Block) {
func (s *BlockchainStatus) NewBlock(block *tmtypes.Header) {
s.mtx.Lock()
defer s.mtx.Unlock()
if block.Header.Height > s.Height {
numTxs := block.Header.NumTxs
s.Height = block.Header.Height
if block.Height > s.Height {
numTxs := block.NumTxs
s.Height = block.Height
s.blockTimeMeter.Mark(1)
s.txThroughputMeter.Mark(int64(numTxs))
s.MeanBlockTime = (1.0 / s.blockTimeMeter.Rate1()) * 1000 // 1/s to ms


+ 25
- 3
types/val.go View File

@ -1,12 +1,15 @@
package types
import (
"encoding/json"
"fmt"
"sync"
"github.com/tendermint/go-crypto"
"github.com/tendermint/go-event-meter"
"github.com/tendermint/go-events"
client "github.com/tendermint/go-rpc/client"
"github.com/tendermint/go-wire"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
)
@ -65,7 +68,7 @@ func (vs *ValidatorState) Start() error {
rpcAddr := vs.Config.RPCAddr
vs.Config.mtx.Unlock()
em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", rpcAddr), ctypes.UnmarshalEvent)
em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", rpcAddr), UnmarshalEvent)
if _, err := em.Start(); err != nil {
return err
}
@ -82,10 +85,10 @@ func (vs *ValidatorState) EventMeter() *eventmeter.EventMeter {
return vs.em
}
func (vs *ValidatorState) NewBlock(block *tmtypes.Block) {
func (vs *ValidatorState) NewBlock(block *tmtypes.Header) {
vs.Status.mtx.Lock()
defer vs.Status.mtx.Unlock()
vs.Status.BlockHeight = block.Header.Height
vs.Status.BlockHeight = block.Height
}
func (vs *ValidatorState) UpdateLatency(latency float64) float64 {
@ -143,3 +146,22 @@ type ValidatorStatus struct {
Latency float64 `json:"latency" wire:"unsafe"`
BlockHeight int `json:"block_height"`
}
//------------------------------------------------------------
// utility
// Unmarshal a json event
func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) {
var err error
result := new(ctypes.TMResult)
wire.ReadJSONPtr(result, b, &err)
if err != nil {
return "", nil, err
}
event, ok := (*result).(*ctypes.ResultEvent)
if !ok {
return "", nil, nil // TODO: handle non-event messages (ie. return from subscribe/unsubscribe)
// fmt.Errorf("Result is not type *ctypes.ResultEvent. Got %v", reflect.TypeOf(*result))
}
return event.Name, event.Data, nil
}

Loading…
Cancel
Save