|
|
@ -894,14 +894,11 @@ func (cs *State) receiveRoutine(ctx context.Context, maxSteps int) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
rs := cs.RoundState |
|
|
|
var mi msgInfo |
|
|
|
|
|
|
|
select { |
|
|
|
case <-cs.txNotifier.TxsAvailable(): |
|
|
|
cs.handleTxsAvailable(ctx) |
|
|
|
|
|
|
|
case mi = <-cs.peerMsgQueue: |
|
|
|
case mi := <-cs.peerMsgQueue: |
|
|
|
if err := cs.wal.Write(mi); err != nil { |
|
|
|
cs.logger.Error("failed writing to WAL", "err", err) |
|
|
|
} |
|
|
@ -910,7 +907,7 @@ func (cs *State) receiveRoutine(ctx context.Context, maxSteps int) { |
|
|
|
// may generate internal events (votes, complete proposals, 2/3 majorities)
|
|
|
|
cs.handleMsg(ctx, mi) |
|
|
|
|
|
|
|
case mi = <-cs.internalMsgQueue: |
|
|
|
case mi := <-cs.internalMsgQueue: |
|
|
|
err := cs.wal.WriteSync(mi) // NOTE: fsync
|
|
|
|
if err != nil { |
|
|
|
panic(fmt.Sprintf( |
|
|
@ -929,7 +926,7 @@ func (cs *State) receiveRoutine(ctx context.Context, maxSteps int) { |
|
|
|
|
|
|
|
// if the timeout is relevant to the rs
|
|
|
|
// go to the next step
|
|
|
|
cs.handleTimeout(ctx, ti, rs) |
|
|
|
cs.handleTimeout(ctx, ti, cs.RoundState) |
|
|
|
|
|
|
|
case <-ctx.Done(): |
|
|
|
onExit(cs) |
|
|
|