From f8f28c8942be295117b629519e2080a763b81e3a Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 15 Dec 2017 16:14:44 -0600 Subject: [PATCH 1/8] enable logging for wal_generator and set timeout to 1 min Refs #977 --- consensus/wal_generator.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index b1ea268e3..00da98f71 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -32,7 +32,7 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) { app := dummy.NewPersistentDummyApplication(filepath.Join(config.DBDir(), "wal_generator")) - logger := log.NewNopLogger() // log.TestingLogger().With("wal_generator", "wal_generator") + logger := log.TestingLogger().With("wal_generator", "wal_generator") ///////////////////////////////////////////////////////////////////////////// // COPY PASTE FROM node.go WITH A FEW MODIFICATIONS @@ -91,8 +91,8 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) { case <-numBlocksWritten: wr.Flush() return b.Bytes(), nil - case <-time.After(time.Duration(5*numBlocks) * time.Second): - return b.Bytes(), fmt.Errorf("waited too long for tendermint to produce %d blocks", numBlocks) + case <-time.After(1 * time.Minute): + return b.Bytes(), fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks) } } From 0ec7909ec30d946190689584bc5b4a0566b97fc1 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 15 Dec 2017 23:08:05 -0500 Subject: [PATCH 2/8] more logging in p2p and consensus --- consensus/reactor.go | 5 ++++- p2p/connection.go | 19 +++++++++++++++++-- p2p/peer.go | 20 ++++---------------- p2p/peer_set_test.go | 1 - p2p/pex_reactor.go | 4 ++-- p2p/pex_reactor_test.go | 1 - 6 files changed, 27 insertions(+), 23 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 90dfa3b1c..053d1e7b2 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -916,7 +916,10 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int, index int) func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { if vote, ok := ps.PickVoteToSend(votes); ok { msg := &VoteMessage{vote} + ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote) return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg}) + } else { + ps.logger.Debug("No vote message to send", "ps", ps) } return false } @@ -1344,7 +1347,7 @@ type HasVoteMessage struct { // String returns a string representation. func (m *HasVoteMessage) String() string { - return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index) + return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v}]", m.Index, m.Height, m.Round, m.Type) } //------------------------------------- diff --git a/p2p/connection.go b/p2p/connection.go index 51f6dd3bb..279de7d9f 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -14,6 +14,7 @@ import ( tmlegacy "github.com/tendermint/go-wire/nowriter/tmlegacy" cmn "github.com/tendermint/tmlibs/common" flow "github.com/tendermint/tmlibs/flowrate" + "github.com/tendermint/tmlibs/log" ) var legacy = tmlegacy.TMEncoderLegacy{} @@ -161,6 +162,13 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec return mconn } +func (c *MConnection) SetLogger(l log.Logger) { + c.BaseService.SetLogger(l) + for _, ch := range c.channels { + ch.SetLogger(l) + } +} + // OnStart implements BaseService func (c *MConnection) OnStart() error { if err := c.BaseService.OnStart(); err != nil { @@ -385,6 +393,7 @@ func (c *MConnection) sendMsgPacket() bool { // Nothing to send? if leastChannel == nil { + c.Logger.Debug("Least channel == nil") return true } else { // c.Logger.Info("Found a msgPacket to send") @@ -566,6 +575,8 @@ type Channel struct { recentlySent int64 // exponential moving average maxMsgPacketPayloadSize int + + logger log.Logger } func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { @@ -582,6 +593,10 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { } } +func (ch *Channel) SetLogger(l log.Logger) { + ch.logger = l +} + // Queues message to send to this channel. // Goroutine-safe // Times out (and returns false) after defaultSendTimeout @@ -654,7 +669,7 @@ func (ch *Channel) nextMsgPacket() msgPacket { // Not goroutine-safe func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) { packet := ch.nextMsgPacket() - // log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet) + ch.logger.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet) writeMsgPacketTo(packet, w, &n, &err) if err == nil { ch.recentlySent += int64(n) @@ -670,7 +685,7 @@ func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) { // Handles incoming msgPackets. Returns a msg bytes if msg is complete. // Not goroutine-safe func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { - // log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet) + ch.logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet) if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) { return nil, wire.ErrBinaryReadOverflow } diff --git a/p2p/peer.go b/p2p/peer.go index cc9c14c37..cc7f4927a 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -2,7 +2,6 @@ package p2p import ( "fmt" - "io" "net" "time" @@ -48,7 +47,6 @@ type peer struct { config *PeerConfig nodeInfo *NodeInfo - key string Data *cmn.CMap // User data. } @@ -209,8 +207,6 @@ func (p *peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er peerNodeInfo.RemoteAddr = p.Addr().String() p.nodeInfo = peerNodeInfo - p.key = peerNodeInfo.PubKey.KeyString() - return nil } @@ -283,26 +279,18 @@ func (p *peer) CanSend(chID byte) bool { return p.mconn.CanSend(chID) } -// WriteTo writes the peer's public key to w. -func (p *peer) WriteTo(w io.Writer) (int64, error) { - var n int - var err error - wire.WriteString(p.key, w, &n, &err) - return int64(n), err -} - // String representation. func (p *peer) String() string { if p.outbound { - return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.key[:12]) + return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key()) } - return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.key[:12]) + return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key()) } // Equals reports whenever 2 peers are actually represent the same node. func (p *peer) Equals(other Peer) bool { - return p.key == other.Key() + return p.Key() == other.Key() } // Get the data for a given key. @@ -317,7 +305,7 @@ func (p *peer) Set(key string, data interface{}) { // Key returns the peer's id key. func (p *peer) Key() string { - return p.key + return p.nodeInfo.ListenAddr // XXX: should probably be PubKey.KeyString() } // NodeInfo returns a copy of the peer's NodeInfo. diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index 694300523..a7f29315a 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -13,7 +13,6 @@ import ( // Returns an empty dummy peer func randPeer() *peer { return &peer{ - key: cmn.RandStr(12), nodeInfo: &NodeInfo{ RemoteAddr: cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), ListenAddr: cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 29c35548e..2bfe7dcab 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -264,8 +264,8 @@ func (r *PEXReactor) ensurePeers() { if dialling := r.Switch.IsDialing(try); dialling { continue } - // XXX: does this work ?! - if connected := r.Switch.Peers().Has(try.IP.String()); connected { + // XXX: Should probably use pubkey as peer key ... + if connected := r.Switch.Peers().Has(try.String()); connected { continue } r.Logger.Info("Will dial address", "addr", try) diff --git a/p2p/pex_reactor_test.go b/p2p/pex_reactor_test.go index e80840b17..a14f0eb2a 100644 --- a/p2p/pex_reactor_test.go +++ b/p2p/pex_reactor_test.go @@ -194,7 +194,6 @@ func createRoutableAddr() (addr string, netAddr *NetAddress) { func createRandomPeer(outbound bool) *peer { addr, netAddr := createRoutableAddr() p := &peer{ - key: cmn.RandStr(12), nodeInfo: &NodeInfo{ ListenAddr: addr, RemoteAddr: netAddr.String(), From bfe0a4a8ac96aaafb966de9e09c0f429a09b1cca Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 16 Dec 2017 02:33:13 -0500 Subject: [PATCH 3/8] more logging --- p2p/connection.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/p2p/connection.go b/p2p/connection.go index 279de7d9f..d0c633a68 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -333,6 +333,7 @@ FOR_LOOP: case <-c.send: // Send some msgPackets eof := c.sendSomeMsgPackets() + c.logger.Debug("finished sendSomeMsgPackets", "eof", eof) if !eof { // Keep sendRoutine awake. select { @@ -352,6 +353,7 @@ FOR_LOOP: } } + c.logger.Debug("sendRoutine: End") // Cleanup } @@ -361,10 +363,12 @@ func (c *MConnection) sendSomeMsgPackets() bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. + c.Logger.Debug("sendMonitor.Limit") c.sendMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.SendRate), true) // Now send some msgPackets. for i := 0; i < numBatchMsgPackets; i++ { + c.Logger.Debug("sendMsgPacket", "i", i) if c.sendMsgPacket() { return true } @@ -406,6 +410,7 @@ func (c *MConnection) sendMsgPacket() bool { c.stopForError(err) return true } + c.logger.Debug("sendMonitor.Update") c.sendMonitor.Update(int(n)) c.flushTimer.Set() return false @@ -419,6 +424,7 @@ func (c *MConnection) recvRoutine() { FOR_LOOP: for { + c.logger.Debug("recvRoutine: recvMonitor.Limit") // Block until .recvMonitor says we can read. c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true) @@ -440,7 +446,9 @@ FOR_LOOP: // Read packet type var n int var err error + c.logger.Debug("recvRoutine: ReadByte") pktType := wire.ReadByte(c.bufReader, &n, &err) + c.logger.Debug("recvRoutine: recvMonitor.Update") c.recvMonitor.Update(int(n)) if err != nil { if c.IsRunning() { @@ -456,12 +464,15 @@ FOR_LOOP: // TODO: prevent abuse, as they cause flush()'s. c.Logger.Debug("Receive Ping") c.pong <- struct{}{} + c.logger.Debug("recvRoutine: trigger pong") case packetTypePong: // do nothing c.Logger.Debug("Receive Pong") case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) + c.logger.Debug("recvRoutine: ReadBinaryPtr") wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err) + c.logger.Debug("recvRoutine: recvMonitor.Update") c.recvMonitor.Update(int(n)) if err != nil { if c.IsRunning() { @@ -477,7 +488,9 @@ FOR_LOOP: c.stopForError(err) } + c.logger.Debug("recvRoutine: recvMsgPacket") msgBytes, err := channel.recvMsgPacket(pkt) + c.logger.Debug("recvRoutine: msgBytes", "msgBytes", msgBytes, "err", err) if err != nil { if c.IsRunning() { c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err) @@ -500,6 +513,7 @@ FOR_LOOP: // Better to send a ping packet when *we* haven't sent anything for a while. c.pingTimer.Reset() } + c.logger.Debug("recvRoutine: End") // Cleanup close(c.pong) From d7cb2f850d1879b25480f36208fb71788dcf2d31 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 16 Dec 2017 04:09:15 -0500 Subject: [PATCH 4/8] more logs in p2p --- p2p/connection.go | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/p2p/connection.go b/p2p/connection.go index d0c633a68..53001fcc4 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -333,7 +333,7 @@ FOR_LOOP: case <-c.send: // Send some msgPackets eof := c.sendSomeMsgPackets() - c.logger.Debug("finished sendSomeMsgPackets", "eof", eof) + c.Logger.Debug("finished sendSomeMsgPackets", "eof", eof) if !eof { // Keep sendRoutine awake. select { @@ -353,7 +353,7 @@ FOR_LOOP: } } - c.logger.Debug("sendRoutine: End") + c.Logger.Debug("sendRoutine: End") // Cleanup } @@ -410,7 +410,7 @@ func (c *MConnection) sendMsgPacket() bool { c.stopForError(err) return true } - c.logger.Debug("sendMonitor.Update") + c.Logger.Debug("sendMonitor.Update") c.sendMonitor.Update(int(n)) c.flushTimer.Set() return false @@ -424,7 +424,7 @@ func (c *MConnection) recvRoutine() { FOR_LOOP: for { - c.logger.Debug("recvRoutine: recvMonitor.Limit") + c.Logger.Debug("recvRoutine: recvMonitor.Limit") // Block until .recvMonitor says we can read. c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true) @@ -446,9 +446,9 @@ FOR_LOOP: // Read packet type var n int var err error - c.logger.Debug("recvRoutine: ReadByte") + c.Logger.Debug("recvRoutine: ReadByte") pktType := wire.ReadByte(c.bufReader, &n, &err) - c.logger.Debug("recvRoutine: recvMonitor.Update") + c.Logger.Debug("recvRoutine: recvMonitor.Update") c.recvMonitor.Update(int(n)) if err != nil { if c.IsRunning() { @@ -464,15 +464,15 @@ FOR_LOOP: // TODO: prevent abuse, as they cause flush()'s. c.Logger.Debug("Receive Ping") c.pong <- struct{}{} - c.logger.Debug("recvRoutine: trigger pong") + c.Logger.Debug("recvRoutine: trigger pong") case packetTypePong: // do nothing c.Logger.Debug("Receive Pong") case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) - c.logger.Debug("recvRoutine: ReadBinaryPtr") + c.Logger.Debug("recvRoutine: ReadBinaryPtr") wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err) - c.logger.Debug("recvRoutine: recvMonitor.Update") + c.Logger.Debug("recvRoutine: recvMonitor.Update") c.recvMonitor.Update(int(n)) if err != nil { if c.IsRunning() { @@ -488,9 +488,9 @@ FOR_LOOP: c.stopForError(err) } - c.logger.Debug("recvRoutine: recvMsgPacket") + c.Logger.Debug("recvRoutine: recvMsgPacket") msgBytes, err := channel.recvMsgPacket(pkt) - c.logger.Debug("recvRoutine: msgBytes", "msgBytes", msgBytes, "err", err) + c.Logger.Debug("recvRoutine: msgBytes", "msgBytes", msgBytes, "err", err) if err != nil { if c.IsRunning() { c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err) @@ -502,6 +502,7 @@ FOR_LOOP: c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes) // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine c.onReceive(pkt.ChannelID, msgBytes) + c.Logger.Debug("done onReceive") } default: err := fmt.Errorf("Unknown message type %X", pktType) @@ -511,9 +512,11 @@ FOR_LOOP: // TODO: shouldn't this go in the sendRoutine? // Better to send a ping packet when *we* haven't sent anything for a while. + c.Logger.Debug("pingTimer.Reset()") c.pingTimer.Reset() + c.Logger.Debug("done pingTimer.Reset()") } - c.logger.Debug("recvRoutine: End") + c.Logger.Debug("recvRoutine: End") // Cleanup close(c.pong) @@ -590,7 +593,7 @@ type Channel struct { maxMsgPacketPayloadSize int - logger log.Logger + Logger log.Logger } func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { @@ -608,7 +611,7 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { } func (ch *Channel) SetLogger(l log.Logger) { - ch.logger = l + ch.Logger = l } // Queues message to send to this channel. @@ -683,7 +686,7 @@ func (ch *Channel) nextMsgPacket() msgPacket { // Not goroutine-safe func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) { packet := ch.nextMsgPacket() - ch.logger.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet) + ch.Logger.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet) writeMsgPacketTo(packet, w, &n, &err) if err == nil { ch.recentlySent += int64(n) @@ -699,7 +702,7 @@ func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) { // Handles incoming msgPackets. Returns a msg bytes if msg is complete. // Not goroutine-safe func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { - ch.logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet) + ch.Logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet) if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) { return nil, wire.ErrBinaryReadOverflow } From 61dc357bb350d383ffee8203b8fa125cc77dd179 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 16 Dec 2017 04:12:25 -0500 Subject: [PATCH 5/8] test/p2p/kill_all: longer timeout --- test/p2p/kill_all/check_peers.sh | 48 ++++++++++++++++---------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/test/p2p/kill_all/check_peers.sh b/test/p2p/kill_all/check_peers.sh index 52dcde91c..37cd13a42 100644 --- a/test/p2p/kill_all/check_peers.sh +++ b/test/p2p/kill_all/check_peers.sh @@ -4,20 +4,20 @@ set -eu NUM_OF_PEERS=$1 # how many attempts for each peer to catch up by height -MAX_ATTEMPTS_TO_CATCH_UP=20 +MAX_ATTEMPTS_TO_CATCH_UP=120 echo "Waiting for nodes to come online" set +e for i in $(seq 1 "$NUM_OF_PEERS"); do - addr=$(test/p2p/ip.sh "$i"):46657 - curl -s "$addr/status" > /dev/null - ERR=$? - while [ "$ERR" != 0 ]; do - sleep 1 - curl -s "$addr/status" > /dev/null - ERR=$? - done - echo "... node $i is up" + addr=$(test/p2p/ip.sh "$i"):46657 + curl -s "$addr/status" > /dev/null + ERR=$? + while [ "$ERR" != 0 ]; do + sleep 1 + curl -s "$addr/status" > /dev/null + ERR=$? + done + echo "... node $i is up" done set -e @@ -28,22 +28,22 @@ echo "1st peer is on height $h1" echo "Waiting until other peers reporting a height higher than the 1st one" for i in $(seq 2 "$NUM_OF_PEERS"); do - attempt=1 - hi=0 + attempt=1 + hi=0 - while [[ $hi -le $h1 ]] ; do - addr=$(test/p2p/ip.sh "$i"):46657 - hi=$(curl -s "$addr/status" | jq .result.latest_block_height) + while [[ $hi -le $h1 ]] ; do + addr=$(test/p2p/ip.sh "$i"):46657 + hi=$(curl -s "$addr/status" | jq .result.latest_block_height) - echo "... peer $i is on height $hi" + echo "... peer $i is on height $hi" - ((attempt++)) - if [ "$attempt" -ge $MAX_ATTEMPTS_TO_CATCH_UP ] ; then - echo "$attempt unsuccessful attempts were made to catch up" - curl -s "$addr/dump_consensus_state" | jq .result - exit 1 - fi + ((attempt++)) + if [ "$attempt" -ge $MAX_ATTEMPTS_TO_CATCH_UP ] ; then + echo "$attempt unsuccessful attempts were made to catch up" + curl -s "$addr/dump_consensus_state" | jq .result + exit 1 + fi - sleep 1 - done + sleep 1 + done done From 5d04ccbe51b714c477b58f9fdd3e6d70336eebfe Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 16 Dec 2017 19:16:08 -0500 Subject: [PATCH 6/8] excessive logging. update tmlibs for timer fix --- config/config.go | 2 +- consensus/mempool_test.go | 2 +- consensus/reactor.go | 5 ++ consensus/reactor_test.go | 105 +++++++++++++++++++++++++------------ consensus/replay_test.go | 9 ++-- consensus/state.go | 4 ++ consensus/wal_generator.go | 10 +++- glide.lock | 2 +- p2p/connection.go | 6 +++ p2p/peer.go | 2 + p2p/switch.go | 10 +++- 11 files changed, 113 insertions(+), 44 deletions(-) diff --git a/config/config.go b/config/config.go index 7f779d9a2..5d4a8ef65 100644 --- a/config/config.go +++ b/config/config.go @@ -389,7 +389,7 @@ func DefaultConsensusConfig() *ConsensusConfig { // TestConsensusConfig returns a configuration for testing the consensus service func TestConsensusConfig() *ConsensusConfig { config := DefaultConsensusConfig() - config.TimeoutPropose = 2000 + config.TimeoutPropose = 100 config.TimeoutProposeDelta = 1 config.TimeoutPrevote = 10 config.TimeoutPrevoteDelta = 1 diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 099767e61..91acce65d 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -104,8 +104,8 @@ func TestTxConcurrentWithCommit(t *testing.T) { go deliverTxsRange(cs, 0, NTxs) startTestRound(cs, height, round) - ticker := time.NewTicker(time.Second * 20) for nTxs := 0; nTxs < NTxs; { + ticker := time.NewTicker(time.Second * 30) select { case b := <-newBlockCh: evt := b.(types.TMEventData).Unwrap().(types.EventDataNewBlock) diff --git a/consensus/reactor.go b/consensus/reactor.go index 053d1e7b2..2051ecd81 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -76,8 +76,11 @@ func (conR *ConsensusReactor) OnStart() error { // OnStop implements BaseService func (conR *ConsensusReactor) OnStop() { + conR.Logger.Debug("conR.OnStop") conR.BaseReactor.OnStop() + conR.Logger.Debug("conR.OnStop: Stopping ConsensusState") conR.conS.Stop() + conR.Logger.Debug("conR.OnStop: DONE") } // SwitchToConsensus switches from fast_sync mode to consensus mode. @@ -1197,6 +1200,8 @@ func (ps *PeerState) String() string { // StringIndented returns a string representation of the PeerState func (ps *PeerState) StringIndented(indent string) string { + ps.mtx.Lock() + defer ps.mtx.Unlock() return fmt.Sprintf(`PeerState{ %s Key %v %s PRS %v diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 56ac17af5..dda33c249 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -10,11 +10,13 @@ import ( "time" "github.com/tendermint/abci/example/dummy" + "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -53,6 +55,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus // make connected switches and start all reactors p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { s.AddReactor("CONSENSUS", reactors[i]) + s.SetLogger(reactors[i].Logger.With("module", "p2p", "validator", i)) return s }, p2p.Connect2Switches) @@ -67,13 +70,17 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus return reactors, eventChans, eventBuses } -func stopConsensusNet(reactors []*ConsensusReactor, eventBuses []*types.EventBus) { - for _, r := range reactors { +func stopConsensusNet(logger log.Logger, reactors []*ConsensusReactor, eventBuses []*types.EventBus) { + logger.Info("stopConsensusNet", "n", len(reactors)) + for i, r := range reactors { + logger.Info("stopConsensusNet: Stopping ConsensusReactor", "i", i) r.Switch.Stop() } - for _, b := range eventBuses { + for i, b := range eventBuses { + logger.Info("stopConsensusNet: Stopping eventBus", "i", i) b.Stop() } + logger.Info("stopConsensusNet: DONE", "n", len(reactors)) } // Ensure a testnet makes blocks @@ -81,7 +88,7 @@ func TestReactor(t *testing.T) { N := 4 css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) reactors, eventChans, eventBuses := startConsensusNet(t, css, N) - defer stopConsensusNet(reactors, eventBuses) + defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // wait till everyone makes the first new block timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) { <-eventChans[j] @@ -97,7 +104,7 @@ func TestReactorProposalHeartbeats(t *testing.T) { c.Consensus.CreateEmptyBlocks = false }) reactors, eventChans, eventBuses := startConsensusNet(t, css, N) - defer stopConsensusNet(reactors, eventBuses) + defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) heartbeatChans := make([]chan interface{}, N) var err error for i := 0; i < N; i++ { @@ -126,11 +133,12 @@ func TestReactorProposalHeartbeats(t *testing.T) { //------------------------------------------------------------- // ensure we can make blocks despite cycling a validator set -func TestVotingPowerChange(t *testing.T) { +func TestReactorVotingPowerChange(t *testing.T) { nVals := 4 + logger := log.TestingLogger() css := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentDummy) reactors, eventChans, eventBuses := startConsensusNet(t, css, nVals) - defer stopConsensusNet(reactors, eventBuses) + defer stopConsensusNet(logger, reactors, eventBuses) // map of active validators activeVals := make(map[string]struct{}) @@ -145,14 +153,14 @@ func TestVotingPowerChange(t *testing.T) { }, css) //--------------------------------------------------------------------------- - t.Log("---------------------------- Testing changing the voting power of one validator a few times") + logger.Debug("---------------------------- Testing changing the voting power of one validator a few times") val1PubKey := css[0].privValidator.GetPubKey() updateValidatorTx := dummy.MakeValSetChangeTx(val1PubKey.Bytes(), 25) previousTotalVotingPower := css[0].GetRoundState().LastValidators.TotalVotingPower() waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx) - waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) + waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) @@ -164,7 +172,7 @@ func TestVotingPowerChange(t *testing.T) { previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower() waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx) - waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) + waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) @@ -176,7 +184,7 @@ func TestVotingPowerChange(t *testing.T) { previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower() waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx) - waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) + waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) @@ -185,13 +193,15 @@ func TestVotingPowerChange(t *testing.T) { } } -func TestValidatorSetChanges(t *testing.T) { +func TestReactorValidatorSetChanges(t *testing.T) { nPeers := 7 nVals := 4 css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentDummy) + logger := log.TestingLogger() + reactors, eventChans, eventBuses := startConsensusNet(t, css, nPeers) - defer stopConsensusNet(reactors, eventBuses) + defer stopConsensusNet(logger, reactors, eventBuses) // map of active validators activeVals := make(map[string]struct{}) @@ -206,7 +216,7 @@ func TestValidatorSetChanges(t *testing.T) { }, css) //--------------------------------------------------------------------------- - t.Log("---------------------------- Testing adding one validator") + logger.Info("---------------------------- Testing adding one validator") newValidatorPubKey1 := css[nVals].privValidator.GetPubKey() newValidatorTx1 := dummy.MakeValSetChangeTx(newValidatorPubKey1.Bytes(), testMinPower) @@ -218,7 +228,7 @@ func TestValidatorSetChanges(t *testing.T) { // wait till everyone makes block 3. // it includes the commit for block 2, which is by the original validator set - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) + waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, newValidatorTx1) // wait till everyone makes block 4. // it includes the commit for block 3, which is by the original validator set @@ -232,14 +242,14 @@ func TestValidatorSetChanges(t *testing.T) { waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css) //--------------------------------------------------------------------------- - t.Log("---------------------------- Testing changing the voting power of one validator") + logger.Info("---------------------------- Testing changing the voting power of one validator") updateValidatorPubKey1 := css[nVals].privValidator.GetPubKey() updateValidatorTx1 := dummy.MakeValSetChangeTx(updateValidatorPubKey1.Bytes(), 25) previousTotalVotingPower := css[nVals].GetRoundState().LastValidators.TotalVotingPower() waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, updateValidatorTx1) - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) + waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, updateValidatorTx1) waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css) @@ -248,7 +258,7 @@ func TestValidatorSetChanges(t *testing.T) { } //--------------------------------------------------------------------------- - t.Log("---------------------------- Testing adding two validators at once") + logger.Info("---------------------------- Testing adding two validators at once") newValidatorPubKey2 := css[nVals+1].privValidator.GetPubKey() newValidatorTx2 := dummy.MakeValSetChangeTx(newValidatorPubKey2.Bytes(), testMinPower) @@ -257,20 +267,20 @@ func TestValidatorSetChanges(t *testing.T) { newValidatorTx3 := dummy.MakeValSetChangeTx(newValidatorPubKey3.Bytes(), testMinPower) waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3) - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) + waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3) waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) activeVals[string(newValidatorPubKey2.Address())] = struct{}{} activeVals[string(newValidatorPubKey3.Address())] = struct{}{} waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css) //--------------------------------------------------------------------------- - t.Log("---------------------------- Testing removing two validators at once") + logger.Info("---------------------------- Testing removing two validators at once") removeValidatorTx2 := dummy.MakeValSetChangeTx(newValidatorPubKey2.Bytes(), 0) removeValidatorTx3 := dummy.MakeValSetChangeTx(newValidatorPubKey3.Bytes(), 0) waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3) - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) + waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3) waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) delete(activeVals, string(newValidatorPubKey2.Address())) delete(activeVals, string(newValidatorPubKey3.Address())) @@ -287,7 +297,7 @@ func TestReactorWithTimeoutCommit(t *testing.T) { } reactors, eventChans, eventBuses := startConsensusNet(t, css, N-1) - defer stopConsensusNet(reactors, eventBuses) + defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // wait till everyone makes the first new block timeoutWaitGroup(t, N-1, func(wg *sync.WaitGroup, j int) { @@ -300,21 +310,51 @@ func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{} timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) { defer wg.Done() + css[j].Logger.Debug("waitForAndValidateBlock") newBlockI, ok := <-eventChans[j] if !ok { return } newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block - t.Logf("Got block height=%v validator=%v", newBlock.Height, j) + css[j].Logger.Debug("waitForAndValidateBlock: Got block", "height", newBlock.Height) err := validateBlock(newBlock, activeVals) - if err != nil { - t.Fatal(err) - } + assert.Nil(t, err) for _, tx := range txs { - if err = css[j].mempool.CheckTx(tx, nil); err != nil { - t.Fatal(err) + css[j].mempool.CheckTx(tx, nil) + assert.Nil(t, err) + } + }, css) +} + +func waitForAndValidateBlockWithTx(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) { + timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) { + defer wg.Done() + ntxs := 0 + BLOCK_TX_LOOP: + for { + css[j].Logger.Debug("waitForAndValidateBlockWithTx", "ntxs", ntxs) + newBlockI, ok := <-eventChans[j] + if !ok { + return + } + newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block + css[j].Logger.Debug("waitForAndValidateBlockWithTx: Got block", "height", newBlock.Height) + err := validateBlock(newBlock, activeVals) + assert.Nil(t, err) + + // check that txs match the txs we're waiting for. + // note they could be spread over multiple blocks, + // but they should be in order. + for _, tx := range newBlock.Data.Txs { + assert.EqualValues(t, txs[ntxs], tx) + ntxs += 1 + } + + if ntxs == len(txs) { + break BLOCK_TX_LOOP } } + }, css) } @@ -325,23 +365,22 @@ func waitForBlockWithUpdatedValsAndValidateIt(t *testing.T, n int, updatedVals m var newBlock *types.Block LOOP: for { + css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt") newBlockI, ok := <-eventChans[j] if !ok { return } newBlock = newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block if newBlock.LastCommit.Size() == len(updatedVals) { - t.Logf("Block with new validators height=%v validator=%v", newBlock.Height, j) + css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block", "height", newBlock.Height) break LOOP } else { - t.Logf("Block with no new validators height=%v validator=%v. Skipping...", newBlock.Height, j) + css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block with no new validators. Skipping", "height", newBlock.Height) } } err := validateBlock(newBlock, updatedVals) - if err != nil { - t.Fatal(err) - } + assert.Nil(t, err) }, css) } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 7d02ecd2c..32f7f4376 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -65,9 +65,7 @@ func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64, err := cs.Start() require.NoError(t, err) - defer func() { - cs.Stop() - }() + defer cs.Stop() // This is just a signal that we haven't halted; its not something contained // in the WAL itself. Assuming the consensus state is running, replay of any @@ -337,6 +335,8 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { if err := wal.Start(); err != nil { t.Fatal(err) } + defer wal.Stop() + chain, commits, err := makeBlockchainFromWAL(wal) if err != nil { t.Fatalf(err.Error()) @@ -366,6 +366,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { if err := proxyApp.Start(); err != nil { t.Fatalf("Error starting proxy app connections: %v", err) } + defer proxyApp.Stop() // get the latest app hash from the app res, err := proxyApp.Query().InfoSync(abci.RequestInfo{""}) @@ -404,13 +405,13 @@ func buildAppStateFromChain(proxyApp proxy.AppConns, if err := proxyApp.Start(); err != nil { panic(err) } + defer proxyApp.Stop() validators := types.TM2PB.Validators(state.Validators) if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { panic(err) } - defer proxyApp.Stop() switch mode { case 0: for i := 0; i < nBlocks; i++ { diff --git a/consensus/state.go b/consensus/state.go index 8a2692a22..a0d814d2f 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -267,14 +267,18 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { // OnStop implements cmn.Service. It stops all routines and waits for the WAL to finish. func (cs *ConsensusState) OnStop() { + cs.Logger.Debug("conS.OnStop") cs.BaseService.OnStop() + cs.Logger.Debug("conS.OnStop: Stopping ticker") cs.timeoutTicker.Stop() // Make BaseService.Wait() wait until cs.wal.Wait() + cs.Logger.Debug("conS.OnStop: Waiting for WAL") if cs.IsRunning() { cs.wal.Wait() } + cs.Logger.Debug("conS.OnStop: DONE") } // Wait waits for the the main routine to return. diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index 00da98f71..b4a2503fa 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -77,7 +77,7 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) { var b bytes.Buffer wr := bufio.NewWriter(&b) numBlocksWritten := make(chan struct{}) - wal := newByteBufferWAL(NewWALEncoder(wr), int64(numBlocks), numBlocksWritten) + wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten) // see wal.go#103 wal.Save(EndHeightMessage{0}) consensusState.wal = wal @@ -142,16 +142,18 @@ type byteBufferWAL struct { stopped bool heightToStop int64 signalWhenStopsTo chan struct{} + logger log.Logger } // needed for determinism var fixedTime, _ = time.Parse(time.RFC3339, "2017-01-02T15:04:05Z") -func newByteBufferWAL(enc *WALEncoder, nBlocks int64, signalStop chan struct{}) *byteBufferWAL { +func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalStop chan struct{}) *byteBufferWAL { return &byteBufferWAL{ enc: enc, heightToStop: nBlocks, signalWhenStopsTo: signalStop, + logger: logger, } } @@ -160,17 +162,21 @@ func newByteBufferWAL(enc *WALEncoder, nBlocks int64, signalStop chan struct{}) // skip writing. func (w *byteBufferWAL) Save(m WALMessage) { if w.stopped { + w.logger.Debug("WAL already stopped. Not writing message", "msg", m) return } if endMsg, ok := m.(EndHeightMessage); ok { + w.logger.Debug("WAL write end height message", "height", endMsg.Height, "stopHeight", w.heightToStop) if endMsg.Height == w.heightToStop { + w.logger.Debug("Stopping WAL at height", "height", endMsg.Height) w.signalWhenStopsTo <- struct{}{} w.stopped = true return } } + w.logger.Debug("WAL Write Message", "msg", m) err := w.enc.Encode(&TimedWALMessage{fixedTime, m}) if err != nil { panic(fmt.Sprintf("failed to encode the msg %v", m)) diff --git a/glide.lock b/glide.lock index d18ccf6e2..05e74faa8 100644 --- a/glide.lock +++ b/glide.lock @@ -129,7 +129,7 @@ imports: subpackages: - iavl - name: github.com/tendermint/tmlibs - version: e4ef2835f0081c2ece83b9c1f777cf071f956e81 + version: a483e1ff486b577ba94e6a20f08bf52fbb7bff14 subpackages: - autofile - cli diff --git a/p2p/connection.go b/p2p/connection.go index 53001fcc4..3322c4e9f 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -185,13 +185,19 @@ func (c *MConnection) OnStart() error { // OnStop implements BaseService func (c *MConnection) OnStop() { + c.Logger.Debug("MConn.OnStop") c.BaseService.OnStop() + c.Logger.Debug("MConn.flushTimer.Stop") c.flushTimer.Stop() + c.Logger.Debug("MConn.pingTimer.Stop") c.pingTimer.Stop() + c.Logger.Debug("MConn.chStatsTimer.Stop") c.chStatsTimer.Stop() if c.quit != nil { + c.Logger.Debug("MConn: Close Quit") close(c.quit) } + c.Logger.Debug("MConn.conn.Close()") c.conn.Close() // nolint: errcheck // We can't close pong safely here because // recvRoutine may write to it after we've stopped. diff --git a/p2p/peer.go b/p2p/peer.go index cc7f4927a..7a3c86097 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -237,7 +237,9 @@ func (p *peer) OnStart() error { // OnStop implements BaseService. func (p *peer) OnStop() { + p.Logger.Debug("Peer.OnStop") p.BaseService.OnStop() + p.Logger.Debug("Peer.mconn.Stop") p.mconn.Stop() } diff --git a/p2p/switch.go b/p2p/switch.go index 33ad28ea7..255f14da7 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -210,17 +210,23 @@ func (sw *Switch) OnStart() error { // OnStop implements BaseService. It stops all listeners, peers, and reactors. func (sw *Switch) OnStop() { // Stop listeners + sw.Logger.Debug("Switch: Stopping listeners") for _, listener := range sw.listeners { listener.Stop() } sw.listeners = nil // Stop peers - for _, peer := range sw.peers.List() { + sw.Logger.Debug("Switch: Stopping Peers") + for i, peer := range sw.peers.List() { + sw.Logger.Debug("Switch: Stopping peer", "i", i, "peer", peer) peer.Stop() + sw.Logger.Debug("Switch: Removing peer", "i", i, "peer", peer) sw.peers.Remove(peer) } // Stop reactors - for _, reactor := range sw.reactors { + sw.Logger.Debug("Switch: Stopping reactors") + for name, reactor := range sw.reactors { + sw.Logger.Debug("Switch: Stopping reactor", "name", name) reactor.Stop() } } From a86df17cebe9f1e89774d3cdcaf10915982e8a56 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 16 Dec 2017 19:55:04 -0500 Subject: [PATCH 7/8] crank city --- consensus/reactor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index dda33c249..7383c790c 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -413,7 +413,7 @@ func timeoutWaitGroup(t *testing.T, n int, f func(*sync.WaitGroup, int), css []* // we're running many nodes in-process, possibly in in a virtual machine, // and spewing debug messages - making a block could take a while, - timeout := time.Second * 60 + timeout := time.Second * 300 select { case <-done: From cb3ac6987edbbeac41f705759f24e8229d506060 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 19 Dec 2017 00:57:28 -0500 Subject: [PATCH 8/8] remove some debugs --- consensus/reactor.go | 5 ----- consensus/state.go | 4 ---- p2p/connection.go | 24 ------------------------ p2p/peer.go | 2 -- p2p/switch.go | 9 ++------- 5 files changed, 2 insertions(+), 42 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 2051ecd81..439ccd994 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -76,11 +76,8 @@ func (conR *ConsensusReactor) OnStart() error { // OnStop implements BaseService func (conR *ConsensusReactor) OnStop() { - conR.Logger.Debug("conR.OnStop") conR.BaseReactor.OnStop() - conR.Logger.Debug("conR.OnStop: Stopping ConsensusState") conR.conS.Stop() - conR.Logger.Debug("conR.OnStop: DONE") } // SwitchToConsensus switches from fast_sync mode to consensus mode. @@ -921,8 +918,6 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { msg := &VoteMessage{vote} ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote) return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg}) - } else { - ps.logger.Debug("No vote message to send", "ps", ps) } return false } diff --git a/consensus/state.go b/consensus/state.go index a0d814d2f..8a2692a22 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -267,18 +267,14 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { // OnStop implements cmn.Service. It stops all routines and waits for the WAL to finish. func (cs *ConsensusState) OnStop() { - cs.Logger.Debug("conS.OnStop") cs.BaseService.OnStop() - cs.Logger.Debug("conS.OnStop: Stopping ticker") cs.timeoutTicker.Stop() // Make BaseService.Wait() wait until cs.wal.Wait() - cs.Logger.Debug("conS.OnStop: Waiting for WAL") if cs.IsRunning() { cs.wal.Wait() } - cs.Logger.Debug("conS.OnStop: DONE") } // Wait waits for the the main routine to return. diff --git a/p2p/connection.go b/p2p/connection.go index 3322c4e9f..b0a407229 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -185,19 +185,13 @@ func (c *MConnection) OnStart() error { // OnStop implements BaseService func (c *MConnection) OnStop() { - c.Logger.Debug("MConn.OnStop") c.BaseService.OnStop() - c.Logger.Debug("MConn.flushTimer.Stop") c.flushTimer.Stop() - c.Logger.Debug("MConn.pingTimer.Stop") c.pingTimer.Stop() - c.Logger.Debug("MConn.chStatsTimer.Stop") c.chStatsTimer.Stop() if c.quit != nil { - c.Logger.Debug("MConn: Close Quit") close(c.quit) } - c.Logger.Debug("MConn.conn.Close()") c.conn.Close() // nolint: errcheck // We can't close pong safely here because // recvRoutine may write to it after we've stopped. @@ -339,7 +333,6 @@ FOR_LOOP: case <-c.send: // Send some msgPackets eof := c.sendSomeMsgPackets() - c.Logger.Debug("finished sendSomeMsgPackets", "eof", eof) if !eof { // Keep sendRoutine awake. select { @@ -359,7 +352,6 @@ FOR_LOOP: } } - c.Logger.Debug("sendRoutine: End") // Cleanup } @@ -369,12 +361,10 @@ func (c *MConnection) sendSomeMsgPackets() bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. - c.Logger.Debug("sendMonitor.Limit") c.sendMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.SendRate), true) // Now send some msgPackets. for i := 0; i < numBatchMsgPackets; i++ { - c.Logger.Debug("sendMsgPacket", "i", i) if c.sendMsgPacket() { return true } @@ -403,7 +393,6 @@ func (c *MConnection) sendMsgPacket() bool { // Nothing to send? if leastChannel == nil { - c.Logger.Debug("Least channel == nil") return true } else { // c.Logger.Info("Found a msgPacket to send") @@ -416,7 +405,6 @@ func (c *MConnection) sendMsgPacket() bool { c.stopForError(err) return true } - c.Logger.Debug("sendMonitor.Update") c.sendMonitor.Update(int(n)) c.flushTimer.Set() return false @@ -430,7 +418,6 @@ func (c *MConnection) recvRoutine() { FOR_LOOP: for { - c.Logger.Debug("recvRoutine: recvMonitor.Limit") // Block until .recvMonitor says we can read. c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true) @@ -452,9 +439,7 @@ FOR_LOOP: // Read packet type var n int var err error - c.Logger.Debug("recvRoutine: ReadByte") pktType := wire.ReadByte(c.bufReader, &n, &err) - c.Logger.Debug("recvRoutine: recvMonitor.Update") c.recvMonitor.Update(int(n)) if err != nil { if c.IsRunning() { @@ -470,15 +455,12 @@ FOR_LOOP: // TODO: prevent abuse, as they cause flush()'s. c.Logger.Debug("Receive Ping") c.pong <- struct{}{} - c.Logger.Debug("recvRoutine: trigger pong") case packetTypePong: // do nothing c.Logger.Debug("Receive Pong") case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) - c.Logger.Debug("recvRoutine: ReadBinaryPtr") wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err) - c.Logger.Debug("recvRoutine: recvMonitor.Update") c.recvMonitor.Update(int(n)) if err != nil { if c.IsRunning() { @@ -494,9 +476,7 @@ FOR_LOOP: c.stopForError(err) } - c.Logger.Debug("recvRoutine: recvMsgPacket") msgBytes, err := channel.recvMsgPacket(pkt) - c.Logger.Debug("recvRoutine: msgBytes", "msgBytes", msgBytes, "err", err) if err != nil { if c.IsRunning() { c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err) @@ -508,7 +488,6 @@ FOR_LOOP: c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes) // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine c.onReceive(pkt.ChannelID, msgBytes) - c.Logger.Debug("done onReceive") } default: err := fmt.Errorf("Unknown message type %X", pktType) @@ -518,11 +497,8 @@ FOR_LOOP: // TODO: shouldn't this go in the sendRoutine? // Better to send a ping packet when *we* haven't sent anything for a while. - c.Logger.Debug("pingTimer.Reset()") c.pingTimer.Reset() - c.Logger.Debug("done pingTimer.Reset()") } - c.Logger.Debug("recvRoutine: End") // Cleanup close(c.pong) diff --git a/p2p/peer.go b/p2p/peer.go index 7a3c86097..cc7f4927a 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -237,9 +237,7 @@ func (p *peer) OnStart() error { // OnStop implements BaseService. func (p *peer) OnStop() { - p.Logger.Debug("Peer.OnStop") p.BaseService.OnStop() - p.Logger.Debug("Peer.mconn.Stop") p.mconn.Stop() } diff --git a/p2p/switch.go b/p2p/switch.go index 255f14da7..76b019806 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -210,23 +210,18 @@ func (sw *Switch) OnStart() error { // OnStop implements BaseService. It stops all listeners, peers, and reactors. func (sw *Switch) OnStop() { // Stop listeners - sw.Logger.Debug("Switch: Stopping listeners") for _, listener := range sw.listeners { listener.Stop() } sw.listeners = nil // Stop peers - sw.Logger.Debug("Switch: Stopping Peers") - for i, peer := range sw.peers.List() { - sw.Logger.Debug("Switch: Stopping peer", "i", i, "peer", peer) + for _, peer := range sw.peers.List() { peer.Stop() - sw.Logger.Debug("Switch: Removing peer", "i", i, "peer", peer) sw.peers.Remove(peer) } // Stop reactors sw.Logger.Debug("Switch: Stopping reactors") - for name, reactor := range sw.reactors { - sw.Logger.Debug("Switch: Stopping reactor", "name", name) + for _, reactor := range sw.reactors { reactor.Stop() } }