|
@ -5,22 +5,16 @@ import ( |
|
|
"math" |
|
|
"math" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
"github.com/go-kit/kit/log" |
|
|
|
|
|
"github.com/pkg/errors" |
|
|
"github.com/pkg/errors" |
|
|
crypto "github.com/tendermint/go-crypto" |
|
|
crypto "github.com/tendermint/go-crypto" |
|
|
events "github.com/tendermint/go-events" |
|
|
|
|
|
rpc_client "github.com/tendermint/go-rpc/client" |
|
|
|
|
|
wire "github.com/tendermint/go-wire" |
|
|
|
|
|
ctypes "github.com/tendermint/tendermint/rpc/core/types" |
|
|
ctypes "github.com/tendermint/tendermint/rpc/core/types" |
|
|
|
|
|
rpc_client "github.com/tendermint/tendermint/rpc/lib/client" |
|
|
tmtypes "github.com/tendermint/tendermint/types" |
|
|
tmtypes "github.com/tendermint/tendermint/types" |
|
|
|
|
|
"github.com/tendermint/tmlibs/events" |
|
|
|
|
|
"github.com/tendermint/tmlibs/log" |
|
|
em "github.com/tendermint/tools/tm-monitor/eventmeter" |
|
|
em "github.com/tendermint/tools/tm-monitor/eventmeter" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
// remove when https://github.com/tendermint/go-rpc/issues/8 will be fixed
|
|
|
|
|
|
type rpcClientI interface { |
|
|
|
|
|
Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
const maxRestarts = 25 |
|
|
const maxRestarts = 25 |
|
|
|
|
|
|
|
|
type Node struct { |
|
|
type Node struct { |
|
@ -38,7 +32,7 @@ type Node struct { |
|
|
em eventMeter |
|
|
em eventMeter |
|
|
|
|
|
|
|
|
// rpcClient is an client for making RPC calls to TM
|
|
|
// rpcClient is an client for making RPC calls to TM
|
|
|
rpcClient rpcClientI |
|
|
|
|
|
|
|
|
rpcClient rpc_client.HTTPClient |
|
|
|
|
|
|
|
|
blockCh chan<- tmtypes.Header |
|
|
blockCh chan<- tmtypes.Header |
|
|
blockLatencyCh chan<- float64 |
|
|
blockLatencyCh chan<- float64 |
|
@ -53,11 +47,11 @@ type Node struct { |
|
|
|
|
|
|
|
|
func NewNode(rpcAddr string, options ...func(*Node)) *Node { |
|
|
func NewNode(rpcAddr string, options ...func(*Node)) *Node { |
|
|
em := em.NewEventMeter(rpcAddr, UnmarshalEvent) |
|
|
em := em.NewEventMeter(rpcAddr, UnmarshalEvent) |
|
|
rpcClient := rpc_client.NewClientURI(rpcAddr) // HTTP client by default
|
|
|
|
|
|
|
|
|
rpcClient := rpc_client.NewURIClient(rpcAddr) // HTTP client by default
|
|
|
return NewNodeWithEventMeterAndRpcClient(rpcAddr, em, rpcClient, options...) |
|
|
return NewNodeWithEventMeterAndRpcClient(rpcAddr, em, rpcClient, options...) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func NewNodeWithEventMeterAndRpcClient(rpcAddr string, em eventMeter, rpcClient rpcClientI, options ...func(*Node)) *Node { |
|
|
|
|
|
|
|
|
func NewNodeWithEventMeterAndRpcClient(rpcAddr string, em eventMeter, rpcClient rpc_client.HTTPClient, options ...func(*Node)) *Node { |
|
|
n := &Node{ |
|
|
n := &Node{ |
|
|
rpcAddr: rpcAddr, |
|
|
rpcAddr: rpcAddr, |
|
|
em: em, |
|
|
em: em, |
|
@ -107,7 +101,10 @@ func (n *Node) Start() error { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
n.em.RegisterLatencyCallback(latencyCallback(n)) |
|
|
n.em.RegisterLatencyCallback(latencyCallback(n)) |
|
|
n.em.Subscribe(tmtypes.EventStringNewBlockHeader(), newBlockCallback(n)) |
|
|
|
|
|
|
|
|
err := n.em.Subscribe(tmtypes.EventStringNewBlockHeader(), newBlockCallback(n)) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return err |
|
|
|
|
|
} |
|
|
n.em.RegisterDisconnectCallback(disconnectCallback(n)) |
|
|
n.em.RegisterDisconnectCallback(disconnectCallback(n)) |
|
|
|
|
|
|
|
|
n.Online = true |
|
|
n.Online = true |
|
@ -129,10 +126,10 @@ func (n *Node) Stop() { |
|
|
// implements eventmeter.EventCallbackFunc
|
|
|
// implements eventmeter.EventCallbackFunc
|
|
|
func newBlockCallback(n *Node) em.EventCallbackFunc { |
|
|
func newBlockCallback(n *Node) em.EventCallbackFunc { |
|
|
return func(metric *em.EventMetric, data events.EventData) { |
|
|
return func(metric *em.EventMetric, data events.EventData) { |
|
|
block := data.(tmtypes.EventDataNewBlockHeader).Header |
|
|
|
|
|
|
|
|
block := data.(tmtypes.TMEventData).Unwrap().(tmtypes.EventDataNewBlockHeader).Header |
|
|
|
|
|
|
|
|
n.Height = uint64(block.Height) |
|
|
n.Height = uint64(block.Height) |
|
|
n.logger.Log("event", "new block", "height", block.Height, "numTxs", block.NumTxs) |
|
|
|
|
|
|
|
|
n.logger.Info("event", "new block", "height", block.Height, "numTxs", block.NumTxs) |
|
|
|
|
|
|
|
|
if n.blockCh != nil { |
|
|
if n.blockCh != nil { |
|
|
n.blockCh <- *block |
|
|
n.blockCh <- *block |
|
@ -144,7 +141,7 @@ func newBlockCallback(n *Node) em.EventCallbackFunc { |
|
|
func latencyCallback(n *Node) em.LatencyCallbackFunc { |
|
|
func latencyCallback(n *Node) em.LatencyCallbackFunc { |
|
|
return func(latency float64) { |
|
|
return func(latency float64) { |
|
|
n.BlockLatency = latency / 1000000.0 // ns to ms
|
|
|
n.BlockLatency = latency / 1000000.0 // ns to ms
|
|
|
n.logger.Log("event", "new block latency", "latency", n.BlockLatency) |
|
|
|
|
|
|
|
|
n.logger.Info("event", "new block latency", "latency", n.BlockLatency) |
|
|
|
|
|
|
|
|
if n.blockLatencyCh != nil { |
|
|
if n.blockLatencyCh != nil { |
|
|
n.blockLatencyCh <- latency |
|
|
n.blockLatencyCh <- latency |
|
@ -156,17 +153,17 @@ func latencyCallback(n *Node) em.LatencyCallbackFunc { |
|
|
func disconnectCallback(n *Node) em.DisconnectCallbackFunc { |
|
|
func disconnectCallback(n *Node) em.DisconnectCallbackFunc { |
|
|
return func() { |
|
|
return func() { |
|
|
n.Online = false |
|
|
n.Online = false |
|
|
n.logger.Log("status", "down") |
|
|
|
|
|
|
|
|
n.logger.Info("status", "down") |
|
|
|
|
|
|
|
|
if n.disconnectCh != nil { |
|
|
if n.disconnectCh != nil { |
|
|
n.disconnectCh <- true |
|
|
n.disconnectCh <- true |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if err := n.RestartEventMeterBackoff(); err != nil { |
|
|
if err := n.RestartEventMeterBackoff(); err != nil { |
|
|
n.logger.Log("err", errors.Wrap(err, "restart failed")) |
|
|
|
|
|
|
|
|
n.logger.Info("err", errors.Wrap(err, "restart failed")) |
|
|
} else { |
|
|
} else { |
|
|
n.Online = true |
|
|
n.Online = true |
|
|
n.logger.Log("status", "online") |
|
|
|
|
|
|
|
|
n.logger.Info("status", "online") |
|
|
|
|
|
|
|
|
if n.disconnectCh != nil { |
|
|
if n.disconnectCh != nil { |
|
|
n.disconnectCh <- false |
|
|
n.disconnectCh <- false |
|
@ -183,7 +180,7 @@ func (n *Node) RestartEventMeterBackoff() error { |
|
|
time.Sleep(d * time.Second) |
|
|
time.Sleep(d * time.Second) |
|
|
|
|
|
|
|
|
if err := n.em.Start(); err != nil { |
|
|
if err := n.em.Start(); err != nil { |
|
|
n.logger.Log("err", errors.Wrap(err, "restart failed")) |
|
|
|
|
|
|
|
|
n.logger.Info("err", errors.Wrap(err, "restart failed")) |
|
|
} else { |
|
|
} else { |
|
|
// TODO: authenticate pubkey
|
|
|
// TODO: authenticate pubkey
|
|
|
return nil |
|
|
return nil |
|
@ -206,11 +203,10 @@ func (n *Node) NumValidators() (height uint64, num int, err error) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (n *Node) validators() (height uint64, validators []*tmtypes.Validator, err error) { |
|
|
func (n *Node) validators() (height uint64, validators []*tmtypes.Validator, err error) { |
|
|
var result ctypes.TMResult |
|
|
|
|
|
if _, err = n.rpcClient.Call("validators", nil, &result); err != nil { |
|
|
|
|
|
|
|
|
vals := new(ctypes.ResultValidators) |
|
|
|
|
|
if _, err = n.rpcClient.Call("validators", nil, vals); err != nil { |
|
|
return 0, make([]*tmtypes.Validator, 0), err |
|
|
return 0, make([]*tmtypes.Validator, 0), err |
|
|
} |
|
|
} |
|
|
vals := result.(*ctypes.ResultValidators) |
|
|
|
|
|
return uint64(vals.BlockHeight), vals.Validators, nil |
|
|
return uint64(vals.BlockHeight), vals.Validators, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -235,21 +231,20 @@ func (n *Node) checkIsValidator() { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
n.logger.Log("err", errors.Wrap(err, "check is validator failed")) |
|
|
|
|
|
|
|
|
n.logger.Info("err", errors.Wrap(err, "check is validator failed")) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (n *Node) getPubKey() (crypto.PubKey, error) { |
|
|
func (n *Node) getPubKey() (crypto.PubKey, error) { |
|
|
if n.pubKey != nil { |
|
|
|
|
|
|
|
|
if !n.pubKey.Empty() { |
|
|
return n.pubKey, nil |
|
|
return n.pubKey, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
var result ctypes.TMResult |
|
|
|
|
|
_, err := n.rpcClient.Call("status", nil, &result) |
|
|
|
|
|
|
|
|
status := new(ctypes.ResultStatus) |
|
|
|
|
|
_, err := n.rpcClient.Call("status", nil, status) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return nil, err |
|
|
|
|
|
|
|
|
return crypto.PubKey{}, err |
|
|
} |
|
|
} |
|
|
status := result.(*ctypes.ResultStatus) |
|
|
|
|
|
n.pubKey = status.PubKey |
|
|
n.pubKey = status.PubKey |
|
|
return n.pubKey, nil |
|
|
return n.pubKey, nil |
|
|
} |
|
|
} |
|
@ -266,16 +261,9 @@ type eventMeter interface { |
|
|
|
|
|
|
|
|
// UnmarshalEvent unmarshals a json event
|
|
|
// UnmarshalEvent unmarshals a json event
|
|
|
func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) { |
|
|
func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) { |
|
|
var err error |
|
|
|
|
|
result := new(ctypes.TMResult) |
|
|
|
|
|
wire.ReadJSONPtr(result, b, &err) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
|
event := new(ctypes.ResultEvent) |
|
|
|
|
|
if err := json.Unmarshal(b, event); err != nil { |
|
|
return "", nil, err |
|
|
return "", nil, err |
|
|
} |
|
|
} |
|
|
event, ok := (*result).(*ctypes.ResultEvent) |
|
|
|
|
|
if !ok { |
|
|
|
|
|
return "", nil, nil // TODO: handle non-event messages (ie. return from subscribe/unsubscribe)
|
|
|
|
|
|
// fmt.Errorf("Result is not type *ctypes.ResultEvent. Got %v", reflect.TypeOf(*result))
|
|
|
|
|
|
} |
|
|
|
|
|
return event.Name, event.Data, nil |
|
|
return event.Name, event.Data, nil |
|
|
} |
|
|
} |