diff --git a/tm-monitor/eventmeter/eventmeter.go b/tm-monitor/eventmeter/eventmeter.go index 314e039d3..9e49cee75 100644 --- a/tm-monitor/eventmeter/eventmeter.go +++ b/tm-monitor/eventmeter/eventmeter.go @@ -6,12 +6,12 @@ import ( "sync" "time" - "github.com/go-kit/kit/log" "github.com/gorilla/websocket" "github.com/pkg/errors" metrics "github.com/rcrowley/go-metrics" - events "github.com/tendermint/go-events" - client "github.com/tendermint/go-rpc/client" + client "github.com/tendermint/tendermint/rpc/lib/client" + "github.com/tendermint/tmlibs/events" + "github.com/tendermint/tmlibs/log" ) //------------------------------------------------------ @@ -253,30 +253,30 @@ func (em *EventMeter) receiveRoutine() { select { case <-pingTicker.C: if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil { - em.logger.Log("err", errors.Wrap(err, "failed to write ping message on websocket")) + em.logger.Error("err", errors.Wrap(err, "failed to write ping message on websocket")) em.StopAndCallDisconnectCallback() return } else if pingAttempts >= maxPingsPerPong { - em.logger.Log("err", errors.Errorf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime)) + em.logger.Error("err", errors.Errorf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime)) em.StopAndCallDisconnectCallback() return } case r := <-em.wsc.ResultsCh: if r == nil { - em.logger.Log("err", errors.New("Expected some event, received nil")) + em.logger.Error("err", errors.New("Expected some event, received nil")) em.StopAndCallDisconnectCallback() return } eventID, data, err := em.unmarshalEvent(r) if err != nil { - em.logger.Log("err", errors.Wrap(err, "failed to unmarshal event")) + em.logger.Error("err", errors.Wrap(err, "failed to unmarshal event")) continue } if eventID != "" { em.updateMetric(eventID, data) } case <-em.wsc.Quit: - em.logger.Log("err", errors.New("WSClient closed unexpectedly")) + em.logger.Error("err", errors.New("WSClient closed unexpectedly")) em.StopAndCallDisconnectCallback() return case <-em.quit: diff --git a/tm-monitor/glide.lock b/tm-monitor/glide.lock index a97890b88..fd70a6e09 100644 --- a/tm-monitor/glide.lock +++ b/tm-monitor/glide.lock @@ -1,16 +1,15 @@ hash: 7cd23c2bc6306917cf22ce16d4a1fc681a2572bc06d2cece54a37152bba7030d -updated: 2017-04-20T19:07:14.174540927-04:00 +updated: 2017-05-20T17:16:08.287741796-04:00 imports: - name: github.com/btcsuite/btcd version: 583684b21bfbde9b5fc4403916fd7c807feb0289 subpackages: - btcec -- name: github.com/BurntSushi/toml - version: 99064174e013895bbd9b025c31100bd1d9b590ca - name: github.com/go-kit/kit version: b6f30a2e0632f5722fb26d8765d726335b79d3e6 subpackages: - log + - log/level - log/term - term - name: github.com/go-logfmt/logfmt @@ -21,12 +20,8 @@ imports: version: 69b215d01a5606c843240eab4937eab3acee6530 subpackages: - proto -- name: github.com/golang/snappy - version: 553a641470496b2327abcac10b36396bd98e45c9 - name: github.com/gorilla/websocket version: 3ab3a8b8831546bd18fd182c20687ca853b2bb13 -- name: github.com/jmhodges/levigo - version: c42d9e0ca023e2198120196f842701bb4c55d7b9 - name: github.com/kr/logfmt version: b84e30acd515aadc4b783ad4ff83aff3299bdfe0 - name: github.com/mattn/go-colorable @@ -42,24 +37,11 @@ imports: subpackages: - assert - require -- name: github.com/syndtr/goleveldb - version: 3c5717caf1475fd25964109a0fc640bd150fce43 - subpackages: - - leveldb - - leveldb/cache - - leveldb/comparer - - leveldb/errors - - leveldb/filter - - leveldb/iterator - - leveldb/journal - - leveldb/memdb - - leveldb/opt - - leveldb/storage - - leveldb/table - - leveldb/util - name: github.com/tendermint/abci - version: 56e13d87f4e3ec1ea756957d6b23caa6ebcf0998 + version: 864d1f80b36b440bde030a5c18d8ac3aa8c2949d subpackages: + - client + - example/dummy - types - name: github.com/tendermint/ed25519 version: 1f52c6f8b8a5c7908aff4497c186af344b428925 @@ -68,45 +50,47 @@ imports: - extra25519 - name: github.com/tendermint/go-common version: e289af53b6bf6af28da129d9ef64389a4cf7987f -- name: github.com/tendermint/go-config - version: 620dcbbd7d587cf3599dedbf329b64311b0c307a - name: github.com/tendermint/go-crypto - version: 0ca2c6fdb0706001ca4c4b9b80c9f428e8cf39da -- name: github.com/tendermint/go-data - version: e7fcc6d081ec8518912fcdc103188275f83a3ee5 -- name: github.com/tendermint/go-db - version: 9643f60bc2578693844aacf380a7c32e4c029fee + version: 7dff40942a64cdeefefa9446b2d104750b349f8a - name: github.com/tendermint/go-events version: fddee66d90305fccb6f6d84d16c34fa65ea5b7f6 -- name: github.com/tendermint/go-flowrate - version: a20c98e61957faa93b4014fbd902f20ab9317a6a - subpackages: - - flowrate - name: github.com/tendermint/go-logger version: cefb3a45c0bf3c493a04e9bcd9b1540528be59f2 -- name: github.com/tendermint/go-merkle - version: 714d4d04557fd068a7c2a1748241ce8428015a96 -- name: github.com/tendermint/go-p2p - version: 17124989a93774833df33107fbf17157a7f8ef31 - subpackages: - - upnp - name: github.com/tendermint/go-rpc - version: 1a42f946dc6bcd88f9f58c7f2fb86f785584d793 + version: 15d5b2ac497da95cd2dceb9c087910ccec4dacb2 subpackages: - client - - server - - types - name: github.com/tendermint/go-wire - version: 2f3b7aafe21c80b19b6ee3210ecb3e3d07c7a471 + version: 5f88da3dbc1a72844e6dfaf274ce87f851d488eb + subpackages: + - data - name: github.com/tendermint/log15 version: ae0f3d6450da9eac7074b439c8e1c3cabf0d5ce6 subpackages: - term - name: github.com/tendermint/tendermint - version: 083fe959e25421fca3d41298d9111167a3b47122 + version: 267f134d44e76efb2adef5f0c993da8a5d5bd1b8 subpackages: + - config + - p2p + - p2p/upnp - rpc/core/types + - rpc/lib/client + - rpc/lib/server + - rpc/lib/types - types +- name: github.com/tendermint/tmlibs + version: 306795ae1d8e4f4a10dcc8bdb32a00455843c9d5 + subpackages: + - autofile + - cli + - clist + - common + - db + - events + - flowrate + - log + - merkle - name: golang.org/x/crypto version: 453249f01cfeb54c3d549ddb75ff152ca243f9d8 subpackages: diff --git a/tm-monitor/main.go b/tm-monitor/main.go index 3c89b8e75..b88158959 100644 --- a/tm-monitor/main.go +++ b/tm-monitor/main.go @@ -6,9 +6,8 @@ import ( "os" "strings" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/term" cmn "github.com/tendermint/go-common" + "github.com/tendermint/tmlibs/log" monitor "github.com/tendermint/tools/tm-monitor/monitor" ) @@ -48,22 +47,12 @@ Examples: } if noton { - // Color errors red - colorFn := func(keyvals ...interface{}) term.FgBgColor { - for i := 1; i < len(keyvals); i += 2 { - if _, ok := keyvals[i].(error); ok { - return term.FgBgColor{Fg: term.White, Bg: term.Red} - } - } - return term.FgBgColor{} - } - - logger = term.NewLogger(os.Stdout, log.NewLogfmtLogger, colorFn) + logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "tm-monitor") } m := startMonitor(flag.Arg(0)) - startRPC(listenAddr, m) + startRPC(listenAddr, m, logger) var ton *Ton if !noton { @@ -81,11 +70,11 @@ Examples: func startMonitor(endpoints string) *monitor.Monitor { m := monitor.NewMonitor() - m.SetLogger(log.With(logger, "component", "monitor")) + m.SetLogger(logger.With("component", "monitor")) for _, e := range strings.Split(endpoints, ",") { n := monitor.NewNode(e) - n.SetLogger(log.With(logger, "node", e)) + n.SetLogger(logger.With("node", e)) if err := m.Monitor(n); err != nil { panic(err) } diff --git a/tm-monitor/monitor/monitor.go b/tm-monitor/monitor/monitor.go index 1e5c56e98..9911ec495 100644 --- a/tm-monitor/monitor/monitor.go +++ b/tm-monitor/monitor/monitor.go @@ -5,9 +5,9 @@ import ( "math/rand" "time" - "github.com/go-kit/kit/log" "github.com/pkg/errors" tmtypes "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmlibs/log" ) // waiting more than this many seconds for a block means we're unhealthy @@ -140,7 +140,7 @@ func (m *Monitor) Stop() { // main loop where we listen for events from the node func (m *Monitor) listen(nodeName string, blockCh <-chan tmtypes.Header, blockLatencyCh <-chan float64, disconnectCh <-chan bool, quit <-chan struct{}) { - logger := log.With(m.logger, "node", nodeName) + logger := m.logger.With("node", nodeName) for { select { @@ -159,7 +159,7 @@ func (m *Monitor) listen(nodeName string, blockCh <-chan tmtypes.Header, blockLa m.Network.NodeIsOnline(nodeName) } case <-time.After(nodeLivenessTimeout): - logger.Log("event", fmt.Sprintf("node was not responding for %v", nodeLivenessTimeout)) + logger.Info("event", fmt.Sprintf("node was not responding for %v", nodeLivenessTimeout)) m.Network.NodeIsDown(nodeName) } } @@ -203,7 +203,7 @@ func (m *Monitor) updateNumValidatorLoop() { if i == randomNodeIndex { height, num, err = n.NumValidators() if err != nil { - m.logger.Log("err", errors.Wrap(err, "update num validators failed")) + m.logger.Info("err", errors.Wrap(err, "update num validators failed")) } break } diff --git a/tm-monitor/monitor/node.go b/tm-monitor/monitor/node.go index 78b8dcf93..32af6aadd 100644 --- a/tm-monitor/monitor/node.go +++ b/tm-monitor/monitor/node.go @@ -5,22 +5,16 @@ import ( "math" "time" - "github.com/go-kit/kit/log" "github.com/pkg/errors" crypto "github.com/tendermint/go-crypto" - events "github.com/tendermint/go-events" - rpc_client "github.com/tendermint/go-rpc/client" - wire "github.com/tendermint/go-wire" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpc_client "github.com/tendermint/tendermint/rpc/lib/client" tmtypes "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmlibs/events" + "github.com/tendermint/tmlibs/log" em "github.com/tendermint/tools/tm-monitor/eventmeter" ) -// remove when https://github.com/tendermint/go-rpc/issues/8 will be fixed -type rpcClientI interface { - Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) -} - const maxRestarts = 25 type Node struct { @@ -38,7 +32,7 @@ type Node struct { em eventMeter // rpcClient is an client for making RPC calls to TM - rpcClient rpcClientI + rpcClient rpc_client.HTTPClient blockCh chan<- tmtypes.Header blockLatencyCh chan<- float64 @@ -57,7 +51,7 @@ func NewNode(rpcAddr string, options ...func(*Node)) *Node { return NewNodeWithEventMeterAndRpcClient(rpcAddr, em, rpcClient, options...) } -func NewNodeWithEventMeterAndRpcClient(rpcAddr string, em eventMeter, rpcClient rpcClientI, options ...func(*Node)) *Node { +func NewNodeWithEventMeterAndRpcClient(rpcAddr string, em eventMeter, rpcClient rpc_client.HTTPClient, options ...func(*Node)) *Node { n := &Node{ rpcAddr: rpcAddr, em: em, @@ -132,10 +126,10 @@ func (n *Node) Stop() { // implements eventmeter.EventCallbackFunc func newBlockCallback(n *Node) em.EventCallbackFunc { return func(metric *em.EventMetric, data events.EventData) { - block := data.(tmtypes.EventDataNewBlockHeader).Header + block := data.(tmtypes.TMEventData).Unwrap().(tmtypes.EventDataNewBlockHeader).Header n.Height = uint64(block.Height) - n.logger.Log("event", "new block", "height", block.Height, "numTxs", block.NumTxs) + n.logger.Info("event", "new block", "height", block.Height, "numTxs", block.NumTxs) if n.blockCh != nil { n.blockCh <- *block @@ -147,7 +141,7 @@ func newBlockCallback(n *Node) em.EventCallbackFunc { func latencyCallback(n *Node) em.LatencyCallbackFunc { return func(latency float64) { n.BlockLatency = latency / 1000000.0 // ns to ms - n.logger.Log("event", "new block latency", "latency", n.BlockLatency) + n.logger.Info("event", "new block latency", "latency", n.BlockLatency) if n.blockLatencyCh != nil { n.blockLatencyCh <- latency @@ -159,17 +153,17 @@ func latencyCallback(n *Node) em.LatencyCallbackFunc { func disconnectCallback(n *Node) em.DisconnectCallbackFunc { return func() { n.Online = false - n.logger.Log("status", "down") + n.logger.Info("status", "down") if n.disconnectCh != nil { n.disconnectCh <- true } if err := n.RestartEventMeterBackoff(); err != nil { - n.logger.Log("err", errors.Wrap(err, "restart failed")) + n.logger.Info("err", errors.Wrap(err, "restart failed")) } else { n.Online = true - n.logger.Log("status", "online") + n.logger.Info("status", "online") if n.disconnectCh != nil { n.disconnectCh <- false @@ -186,7 +180,7 @@ func (n *Node) RestartEventMeterBackoff() error { time.Sleep(d * time.Second) if err := n.em.Start(); err != nil { - n.logger.Log("err", errors.Wrap(err, "restart failed")) + n.logger.Info("err", errors.Wrap(err, "restart failed")) } else { // TODO: authenticate pubkey return nil @@ -209,11 +203,10 @@ func (n *Node) NumValidators() (height uint64, num int, err error) { } func (n *Node) validators() (height uint64, validators []*tmtypes.Validator, err error) { - var result ctypes.TMResult - if _, err = n.rpcClient.Call("validators", nil, &result); err != nil { + vals := new(ctypes.ResultValidators) + if _, err = n.rpcClient.Call("validators", nil, vals); err != nil { return 0, make([]*tmtypes.Validator, 0), err } - vals := result.(*ctypes.ResultValidators) return uint64(vals.BlockHeight), vals.Validators, nil } @@ -238,21 +231,20 @@ func (n *Node) checkIsValidator() { } } } else { - n.logger.Log("err", errors.Wrap(err, "check is validator failed")) + n.logger.Info("err", errors.Wrap(err, "check is validator failed")) } } func (n *Node) getPubKey() (crypto.PubKey, error) { - if n.pubKey != nil { + if !n.pubKey.Empty() { return n.pubKey, nil } - var result ctypes.TMResult - _, err := n.rpcClient.Call("status", nil, &result) + status := new(ctypes.ResultStatus) + _, err := n.rpcClient.Call("status", nil, status) if err != nil { - return nil, err + return crypto.PubKey{}, err } - status := result.(*ctypes.ResultStatus) n.pubKey = status.PubKey return n.pubKey, nil } @@ -269,16 +261,9 @@ type eventMeter interface { // UnmarshalEvent unmarshals a json event func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) { - var err error - result := new(ctypes.TMResult) - wire.ReadJSONPtr(result, b, &err) - if err != nil { + event := new(ctypes.ResultEvent) + if err := json.Unmarshal(b, event); err != nil { return "", nil, err } - event, ok := (*result).(*ctypes.ResultEvent) - if !ok { - return "", nil, nil // TODO: handle non-event messages (ie. return from subscribe/unsubscribe) - // fmt.Errorf("Result is not type *ctypes.ResultEvent. Got %v", reflect.TypeOf(*result)) - } return event.Name, event.Data, nil } diff --git a/tm-monitor/rpc.go b/tm-monitor/rpc.go index 5162ecb45..bbf508a9e 100644 --- a/tm-monitor/rpc.go +++ b/tm-monitor/rpc.go @@ -4,19 +4,20 @@ import ( "errors" "net/http" - rpc "github.com/tendermint/go-rpc/server" + rpc "github.com/tendermint/tendermint/rpc/lib/server" + "github.com/tendermint/tmlibs/log" monitor "github.com/tendermint/tools/tm-monitor/monitor" ) -func startRPC(listenAddr string, m *monitor.Monitor) { +func startRPC(listenAddr string, m *monitor.Monitor, logger log.Logger) { routes := routes(m) // serve http and ws mux := http.NewServeMux() wm := rpc.NewWebsocketManager(routes, nil) // TODO: evsw mux.HandleFunc("/websocket", wm.WebsocketHandler) - rpc.RegisterRPCFuncs(mux, routes) - if _, err := rpc.StartHTTPServer(listenAddr, mux); err != nil { + rpc.RegisterRPCFuncs(mux, routes, logger) + if _, err := rpc.StartHTTPServer(listenAddr, mux, logger); err != nil { panic(err) } }