diff --git a/blockchain/pool.go b/blockchain/pool.go index 48a258c79..e1288c9fa 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -28,7 +28,7 @@ var peerTimeoutSeconds = time.Duration(15) // not const so we can override with Every so often we ask peers what height they're on so we can keep going. Requests are continuously made for blocks of higher heights until - we reach the limits. If most of the requests have no available peers, and we + the limit is reached. If most of the requests have no available peers, and we are not at peer limits, we can probably switch to consensus reactor */ diff --git a/consensus/common.go b/consensus/common.go index 6f76d1887..1e16c4dab 100644 --- a/consensus/common.go +++ b/consensus/common.go @@ -27,3 +27,9 @@ func subscribeToEventRespond(evsw types.EventSwitch, receiver, eventID string) c }) return ch } + +func discardFromChan(ch chan interface{}, n int) { + for i := 0; i < n; i++ { + <-ch + } +} diff --git a/consensus/state.go b/consensus/state.go index 5a4737733..cc9cd51e7 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1357,7 +1357,7 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerKey string) error { //----------------------------------------------------------------------------- func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool, err error) { - cs.Logger.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "csHeight", cs.Height) + cs.Logger.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "valIndex", vote.ValidatorIndex, "csHeight", cs.Height) // A precommit for the previous height? // These come in while we wait timeoutCommit diff --git a/consensus/state_test.go b/consensus/state_test.go index ee88ac258..81ef016be 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -524,9 +524,7 @@ func TestLockPOLRelock(t *testing.T) { signAddVotes(cs1, types.VoteTypePrevote, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), vs2, vs3, vs4) // prevotes - <-voteCh - <-voteCh - <-voteCh + discardFromChan(voteCh, 3) <-voteCh // our precommit // the proposed block should now be locked and our precommit added @@ -536,9 +534,7 @@ func TestLockPOLRelock(t *testing.T) { signAddVotes(cs1, types.VoteTypePrecommit, nil, types.PartSetHeader{}, vs2, vs4) signAddVotes(cs1, types.VoteTypePrecommit, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), vs3) // precommites - <-voteCh - <-voteCh - <-voteCh + discardFromChan(voteCh, 3) // before we timeout to the new round set the new proposal prop, propBlock := decideProposal(cs1, vs2, vs2.Height, vs2.Round+1) @@ -577,9 +573,7 @@ func TestLockPOLRelock(t *testing.T) { // now lets add prevotes from everyone else for the new block signAddVotes(cs1, types.VoteTypePrevote, propBlockHash, propBlockParts.Header(), vs2, vs3, vs4) // prevotes - <-voteCh - <-voteCh - <-voteCh + discardFromChan(voteCh, 3) // now either we go to PrevoteWait or Precommit select { @@ -594,8 +588,7 @@ func TestLockPOLRelock(t *testing.T) { validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash, propBlockHash) signAddVotes(cs1, types.VoteTypePrecommit, propBlockHash, propBlockParts.Header(), vs2, vs3) - <-voteCh - <-voteCh + discardFromChan(voteCh, 2) be := <-newBlockCh b := be.(types.TMEventData).Unwrap().(types.EventDataNewBlockHeader) diff --git a/p2p/switch.go b/p2p/switch.go index 2e9d213df..2d8d34357 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -381,7 +381,7 @@ func (sw *Switch) Peers() IPeerSet { // TODO: make record depending on reason. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { addr := NewNetAddress(peer.Addr()) - sw.Logger.Info("Stopping peer for error", "peer", peer, "err", reason) + sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason) sw.stopAndRemovePeer(peer, reason) if peer.IsPersistent() { diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 1538f08a7..b6431a1ec 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -338,7 +338,7 @@ func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) { const ( writeChanCapacity = 1000 - wsWriteTimeoutSeconds = 30 // each write times out after this + wsWriteTimeoutSeconds = 30 // each write times out after this. wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings. wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds. ) @@ -535,8 +535,7 @@ func (wsc *wsConnection) writeRoutine() { case <-wsc.Quit: return case <-wsc.pingTicker.C: - wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) - err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{}) + err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{}) if err != nil { wsc.Logger.Error("Failed to write ping message on websocket", "err", err) wsc.Stop() @@ -547,8 +546,7 @@ func (wsc *wsConnection) writeRoutine() { if err != nil { wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err) } else { - wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) - if err = wsc.baseConn.WriteMessage(websocket.TextMessage, jsonBytes); err != nil { + if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil { wsc.Logger.Error("Failed to write response on websocket", "err", err) wsc.Stop() return @@ -558,6 +556,13 @@ func (wsc *wsConnection) writeRoutine() { } } +// All writes to the websocket must (re)set the write deadline. +// If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553) +func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error { + wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) + return wsc.baseConn.WriteMessage(msgType, msg) +} + //---------------------------------------- // Main manager for all websocket connections