Browse Source

fixes from review

pull/559/head
Ethan Buchman 7 years ago
parent
commit
0d1fa8e884
6 changed files with 23 additions and 19 deletions
  1. +1
    -1
      blockchain/pool.go
  2. +6
    -0
      consensus/common.go
  3. +1
    -1
      consensus/state.go
  4. +4
    -11
      consensus/state_test.go
  5. +1
    -1
      p2p/switch.go
  6. +10
    -5
      rpc/lib/server/handlers.go

+ 1
- 1
blockchain/pool.go View File

@ -28,7 +28,7 @@ var peerTimeoutSeconds = time.Duration(15) // not const so we can override with
Every so often we ask peers what height they're on so we can keep going.
Requests are continuously made for blocks of higher heights until
we reach the limits. If most of the requests have no available peers, and we
the limit is reached. If most of the requests have no available peers, and we
are not at peer limits, we can probably switch to consensus reactor
*/


+ 6
- 0
consensus/common.go View File

@ -27,3 +27,9 @@ func subscribeToEventRespond(evsw types.EventSwitch, receiver, eventID string) c
})
return ch
}
func discardFromChan(ch chan interface{}, n int) {
for i := 0; i < n; i++ {
<-ch
}
}

+ 1
- 1
consensus/state.go View File

@ -1357,7 +1357,7 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerKey string) error {
//-----------------------------------------------------------------------------
func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool, err error) {
cs.Logger.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "csHeight", cs.Height)
cs.Logger.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "valIndex", vote.ValidatorIndex, "csHeight", cs.Height)
// A precommit for the previous height?
// These come in while we wait timeoutCommit


+ 4
- 11
consensus/state_test.go View File

@ -524,9 +524,7 @@ func TestLockPOLRelock(t *testing.T) {
signAddVotes(cs1, types.VoteTypePrevote, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), vs2, vs3, vs4)
// prevotes
<-voteCh
<-voteCh
<-voteCh
discardFromChan(voteCh, 3)
<-voteCh // our precommit
// the proposed block should now be locked and our precommit added
@ -536,9 +534,7 @@ func TestLockPOLRelock(t *testing.T) {
signAddVotes(cs1, types.VoteTypePrecommit, nil, types.PartSetHeader{}, vs2, vs4)
signAddVotes(cs1, types.VoteTypePrecommit, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), vs3)
// precommites
<-voteCh
<-voteCh
<-voteCh
discardFromChan(voteCh, 3)
// before we timeout to the new round set the new proposal
prop, propBlock := decideProposal(cs1, vs2, vs2.Height, vs2.Round+1)
@ -577,9 +573,7 @@ func TestLockPOLRelock(t *testing.T) {
// now lets add prevotes from everyone else for the new block
signAddVotes(cs1, types.VoteTypePrevote, propBlockHash, propBlockParts.Header(), vs2, vs3, vs4)
// prevotes
<-voteCh
<-voteCh
<-voteCh
discardFromChan(voteCh, 3)
// now either we go to PrevoteWait or Precommit
select {
@ -594,8 +588,7 @@ func TestLockPOLRelock(t *testing.T) {
validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash, propBlockHash)
signAddVotes(cs1, types.VoteTypePrecommit, propBlockHash, propBlockParts.Header(), vs2, vs3)
<-voteCh
<-voteCh
discardFromChan(voteCh, 2)
be := <-newBlockCh
b := be.(types.TMEventData).Unwrap().(types.EventDataNewBlockHeader)


+ 1
- 1
p2p/switch.go View File

@ -381,7 +381,7 @@ func (sw *Switch) Peers() IPeerSet {
// TODO: make record depending on reason.
func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
addr := NewNetAddress(peer.Addr())
sw.Logger.Info("Stopping peer for error", "peer", peer, "err", reason)
sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason)
sw.stopAndRemovePeer(peer, reason)
if peer.IsPersistent() {


+ 10
- 5
rpc/lib/server/handlers.go View File

@ -338,7 +338,7 @@ func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) {
const (
writeChanCapacity = 1000
wsWriteTimeoutSeconds = 30 // each write times out after this
wsWriteTimeoutSeconds = 30 // each write times out after this.
wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings.
wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds.
)
@ -535,8 +535,7 @@ func (wsc *wsConnection) writeRoutine() {
case <-wsc.Quit:
return
case <-wsc.pingTicker.C:
wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds))
err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{})
err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{})
if err != nil {
wsc.Logger.Error("Failed to write ping message on websocket", "err", err)
wsc.Stop()
@ -547,8 +546,7 @@ func (wsc *wsConnection) writeRoutine() {
if err != nil {
wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err)
} else {
wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds))
if err = wsc.baseConn.WriteMessage(websocket.TextMessage, jsonBytes); err != nil {
if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil {
wsc.Logger.Error("Failed to write response on websocket", "err", err)
wsc.Stop()
return
@ -558,6 +556,13 @@ func (wsc *wsConnection) writeRoutine() {
}
}
// All writes to the websocket must (re)set the write deadline.
// If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553)
func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error {
wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds))
return wsc.baseConn.WriteMessage(msgType, msg)
}
//----------------------------------------
// Main manager for all websocket connections


Loading…
Cancel
Save