diff --git a/consensus/reactor.go b/consensus/reactor.go index dcf28e56b..690ae2067 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -22,8 +22,6 @@ const ( DataChannel = byte(0x21) VoteChannel = byte(0x22) - PeerStateKey = "ConsensusReactor.peerState" - peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send. ) @@ -107,7 +105,7 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { // Create peerState for peer peerState := NewPeerState(peer) - peer.Data.Set(PeerStateKey, peerState) + peer.Data.Set(types.PeerStateKey, peerState) // Begin gossip routines for this peer. go conR.gossipDataRoutine(peer, peerState) @@ -138,7 +136,7 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte } // Get peer states - ps := peer.Data.Get(PeerStateKey).(*PeerState) + ps := peer.Data.Get(types.PeerStateKey).(*PeerState) _, msg, err := DecodeMessage(msgBytes) if err != nil { log.Warn("Error decoding message", "channel", chID, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes) @@ -588,6 +586,14 @@ func (ps *PeerState) GetRoundState() *PeerRoundState { return &prs } +// Returns an atomic snapshot of the PeerRoundState's height +// used by the mempool to ensure peers are caught up before broadcasting new txs +func (ps *PeerState) GetHeight() int { + ps.mtx.Lock() + defer ps.mtx.Unlock() + return ps.PeerRoundState.Height +} + func (ps *PeerState) SetHasProposal(proposal *types.Proposal) { ps.mtx.Lock() defer ps.mtx.Unlock() diff --git a/consensus/state.go b/consensus/state.go index 9c46a7656..4850a8d01 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -657,6 +657,10 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts return } txs := cs.mempoolReactor.Mempool.GetProposalTxs() + MaxTxsPerBlock := 100 // TODO + if len(txs) > MaxTxsPerBlock { + txs = txs[:MaxTxsPerBlock] + } block = &types.Block{ Header: &types.Header{ ChainID: cs.state.ChainID, diff --git a/mempool/mempool.go b/mempool/mempool.go index e0e09f04f..17a736e63 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -19,7 +19,9 @@ type Mempool struct { mtx sync.Mutex state *sm.State cache *sm.BlockCache - txs []types.Tx + txs []types.Tx // TODO: we need to add a map to facilitate replace-by-fee + + resetInfo ResetInfo // so broadcast routines can respond to mempool flushing } func NewMempool(state *sm.State) *Mempool { @@ -59,6 +61,18 @@ func (mem *Mempool) GetProposalTxs() []types.Tx { return mem.txs } +// We use this to inform peer routines of how the mempool has been updated +type ResetInfo struct { + Height int + Included []Range + Invalid []Range +} + +type Range struct { + Start int + Length int +} + // "block" is the new block being committed. // "state" is the result of state.AppendBlock("block"). // Txs that are present in "block" are discarded from mempool. @@ -75,33 +89,51 @@ func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) { blockTxsMap[string(types.TxID(state.ChainID, tx))] = struct{}{} } - // Next, filter all txs from mem.txs that are in blockTxsMap - txs := []types.Tx{} - for _, tx := range mem.txs { + // Now we filter all txs from mem.txs that are in blockTxsMap, + // and ExecTx on what remains. Only valid txs are kept. + // We track the ranges of txs included in the block and invalidated by it + // so we can tell peer routines + var ri = ResetInfo{Height: block.Height} + var validTxs []types.Tx + includedStart, invalidStart := -1, -1 + for i, tx := range mem.txs { txID := types.TxID(state.ChainID, tx) if _, ok := blockTxsMap[string(txID)]; ok { + startRange(&includedStart, i) // start counting included txs + endRange(&invalidStart, i, &ri.Invalid) // stop counting invalid txs log.Info("Filter out, already committed", "tx", tx, "txID", txID) - continue } else { - log.Info("Filter in, still new", "tx", tx, "txID", txID) - txs = append(txs, tx) - } - } - - // Next, filter all txs that aren't valid given new state. - validTxs := []types.Tx{} - for _, tx := range txs { - err := sm.ExecTx(mem.cache, tx, false, nil) - if err == nil { - log.Info("Filter in, valid", "tx", tx) - validTxs = append(validTxs, tx) - } else { - // tx is no longer valid. - log.Info("Filter out, no longer valid", "tx", tx, "error", err) + endRange(&includedStart, i, &ri.Included) // stop counting included txs + err := sm.ExecTx(mem.cache, tx, false, nil) + if err != nil { + startRange(&invalidStart, i) // start counting invalid txs + log.Info("Filter out, no longer valid", "tx", tx, "error", err) + } else { + endRange(&invalidStart, i, &ri.Invalid) // stop counting invalid txs + log.Info("Filter in, new, valid", "tx", tx, "txID", txID) + validTxs = append(validTxs, tx) + } } } + endRange(&includedStart, len(mem.txs)-1, &ri.Included) // stop counting included txs + endRange(&invalidStart, len(mem.txs)-1, &ri.Invalid) // stop counting invalid txs // We're done! log.Info("New txs", "txs", validTxs, "oldTxs", mem.txs) mem.txs = validTxs + mem.resetInfo = ri +} + +func startRange(start *int, i int) { + if *start < 0 { + *start = i + } +} + +func endRange(start *int, i int, ranger *[]Range) { + if *start >= 0 { + length := i - *start + *ranger = append(*ranger, Range{*start, length}) + *start = -1 + } } diff --git a/mempool/reactor.go b/mempool/reactor.go index 2e6e36ec6..bd055c716 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "reflect" + "time" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/events" @@ -14,6 +15,9 @@ import ( var ( MempoolChannel = byte(0x30) + + checkExecutedTxsMilliseconds = 1 // check for new mempool txs to send to peer + txsToSendPerCheck = 64 // send up to this many txs from the mempool per check ) // MempoolReactor handles mempool tx broadcasting amongst peers. @@ -44,11 +48,14 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { } // Implements Reactor -func (pexR *MempoolReactor) AddPeer(peer *p2p.Peer) { +func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) { + // Each peer gets a go routine on which we broadcast transactions in the same order we applied them to our state. + go memR.broadcastTxRoutine(peer) } // Implements Reactor -func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { +func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { + // broadcast routine checks if peer is gone and returns } // Implements Reactor @@ -70,29 +77,116 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { } else { log.Info("Added valid tx", "tx", msg.Tx) } - // Share tx. - // We use a simple shotgun approach for now. - // TODO: improve efficiency - for _, peer := range memR.Switch.Peers().List() { - if peer.Key == src.Key { + // broadcasting happens from go routines per peer + default: + log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) + } +} + +// Just an alias for AddTx since broadcasting happens in peer routines +func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { + return memR.Mempool.AddTx(tx) +} + +type PeerState interface { + GetHeight() int +} + +// send new mempool txs to peer, strictly in order we applied them to our state. +// new blocks take chunks out of the mempool, but we've already sent some txs to the peer. +// so we wait to hear that the peer has progressed to the new height, and then continue sending txs from where we left off +func (memR *MempoolReactor) broadcastTxRoutine(peer *p2p.Peer) { + newBlockChan := make(chan ResetInfo) + memR.evsw.(*events.EventSwitch).AddListenerForEvent("broadcastRoutine:"+peer.Key, types.EventStringNewBlock(), func(data types.EventData) { + // no lock needed because consensus is blocking on this + // and the mempool is reset before this event fires + newBlockChan <- memR.Mempool.resetInfo + }) + timer := time.NewTicker(time.Millisecond * time.Duration(checkExecutedTxsMilliseconds)) + currentHeight := memR.Mempool.state.LastBlockHeight + var nTxs, txsSent int + var txs []types.Tx + for { + select { + case <-timer.C: + if !peer.IsRunning() { + return + } + + // make sure the peer is up to date + peerState := peer.Data.Get(types.PeerStateKey).(PeerState) + if peerState.GetHeight() < currentHeight { continue } - peer.TrySend(MempoolChannel, msg) + + // check the mempool for new transactions + nTxs, txs = memR.getNewTxs(txsSent, currentHeight) + + theseTxsSent := 0 + start := time.Now() + TX_LOOP: + for _, tx := range txs { + // send tx to peer. + msg := &TxMessage{Tx: tx} + success := peer.Send(MempoolChannel, msg) + if !success { + break TX_LOOP + } else { + theseTxsSent += 1 + } + } + if theseTxsSent > 0 { + txsSent += theseTxsSent + log.Warn("Sent txs to peer", "ntxs", theseTxsSent, "took", time.Since(start), "total_sent", txsSent, "total_exec", nTxs) + } + + case ri := <-newBlockChan: + currentHeight = ri.Height + + // find out how many txs below what we've sent were included in a block and how many became invalid + included := tallyRangesUpTo(ri.Included, txsSent) + invalidated := tallyRangesUpTo(ri.Invalid, txsSent) + + txsSent -= included + invalidated } + } +} - default: - log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) +// fetch new txs from the mempool +func (memR *MempoolReactor) getNewTxs(txsSent, height int) (nTxs int, txs []types.Tx) { + memR.Mempool.mtx.Lock() + defer memR.Mempool.mtx.Unlock() + + // if the mempool got ahead of us just return empty txs + if memR.Mempool.state.LastBlockHeight != height { + return } + + nTxs = len(memR.Mempool.txs) + if txsSent < nTxs { + if nTxs > txsSent+txsToSendPerCheck { + txs = memR.Mempool.txs[txsSent : txsSent+txsToSendPerCheck] + } else { + txs = memR.Mempool.txs[txsSent:] + } + } + return } -func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { - err := memR.Mempool.AddTx(tx) - if err != nil { - return err +// return the size of ranges less than upTo +func tallyRangesUpTo(ranger []Range, upTo int) int { + totalUpTo := 0 + for _, r := range ranger { + if r.Start >= upTo { + break + } + if r.Start+r.Length-1 > upTo { + totalUpTo += upTo - r.Start - 1 + break + } + totalUpTo += r.Length } - msg := &TxMessage{Tx: tx} - memR.Switch.Broadcast(MempoolChannel, msg) - return nil + return totalUpTo } // implements events.Eventable diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index d6abcf72f..18e0d39a4 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -31,7 +31,7 @@ func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { peerRoundStates := []string{} for _, peer := range p2pSwitch.Peers().List() { // TODO: clean this up? - peerState := peer.Data.Get(cm.PeerStateKey).(*cm.PeerState) + peerState := peer.Data.Get(types.PeerStateKey).(*cm.PeerState) peerRoundState := peerState.GetRoundState() peerRoundStateStr := peer.Key + ":" + string(wire.JSONBytes(peerRoundState)) peerRoundStates = append(peerRoundStates, peerRoundStateStr) diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index eaf1d5c68..841ced14f 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -30,5 +30,6 @@ func BroadcastTx(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { } func ListUnconfirmedTxs() (*ctypes.ResultListUnconfirmedTxs, error) { - return &ctypes.ResultListUnconfirmedTxs{mempoolReactor.Mempool.GetProposalTxs()}, nil + txs := mempoolReactor.Mempool.GetProposalTxs() + return &ctypes.ResultListUnconfirmedTxs{len(txs), txs}, nil } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index e756544c7..21b3b11ad 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -98,6 +98,7 @@ type Receipt struct { } type ResultListUnconfirmedTxs struct { + N int `json:"n_txs"` Txs []types.Tx `json:"txs"` }