From d54bf6bcd547b978565368a799e5e5e54e955f88 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 21 Apr 2015 19:51:23 -0700 Subject: [PATCH] blockchain reactor to consensus reactor transition on catchup --- Makefile | 5 +++ blockchain/pool.go | 47 +++++++++++++++++++++++--- blockchain/reactor.go | 78 +++++++++++++++++++++++++++++++++++-------- consensus/reactor.go | 10 +++++- node/node.go | 9 ++--- 5 files changed, 126 insertions(+), 23 deletions(-) diff --git a/Makefile b/Makefile index f79cfeed4..4ede2e46b 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,11 @@ build: get_deps go build -o build/barak github.com/tendermint/tendermint/cmd/barak go build -o build/debora github.com/tendermint/tendermint/cmd/debora +no_get: + go build -o build/tendermint github.com/tendermint/tendermint/cmd/tendermint + go build -o build/barak github.com/tendermint/tendermint/cmd/barak + go build -o build/debora github.com/tendermint/tendermint/cmd/debora + build_race: get_deps go build -race -o build/tendermint github.com/tendermint/tendermint/cmd/tendermint go build -race -o build/barak github.com/tendermint/tendermint/cmd/barak diff --git a/blockchain/pool.go b/blockchain/pool.go index 0bd620463..07d4a7705 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -18,15 +18,29 @@ const ( maxRequestsPerPeer = 300 ) +// numTotal = numPending + blocks in the pool we havnt synced yet + var ( requestTimeoutSeconds = time.Duration(1) ) +/* + Peers self report their heights when a new peer joins the block pool. + Starting from whatever we've got (pool.height), we request blocks + in sequence from peers that reported higher heights than ours. + Every so often we ask peers what height they're on so we can keep going. + + Requests are continuously made for blocks of heigher heights until + the limits. If most of the requests have no available peers, and we + are not at peer limits, we can probably switch to consensus reactor +*/ + type BlockPool struct { // block requests requestsMtx sync.Mutex requests map[uint]*bpRequest - height uint // the lowest key in requests. + peerless int32 // number of requests without peers + height uint // the lowest key in requests. numPending int32 numTotal int32 @@ -145,10 +159,13 @@ func (pool *BlockPool) RedoRequest(height uint) { if request.block == nil { panic("Expected block to be non-nil") } + // TODO: record this malfeasance + // maybe punish peer on switch (an invalid block!) pool.RemovePeer(request.peerId) // Lock on peersMtx. request.block = nil request.peerId = "" pool.numPending++ + pool.peerless++ go requestRoutine(pool, height) } @@ -169,9 +186,22 @@ func (pool *BlockPool) setPeerForRequest(height uint, peerId string) { if request == nil { return } + pool.peerless-- request.peerId = peerId } +func (pool *BlockPool) removePeerForRequest(height uint, peerId string) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() + + request := pool.requests[height] + if request == nil { + return + } + pool.peerless++ + request.peerId = "" +} + func (pool *BlockPool) AddBlock(block *types.Block, peerId string) { pool.requestsMtx.Lock() // Lock defer pool.requestsMtx.Unlock() @@ -198,7 +228,7 @@ func (pool *BlockPool) getPeer(peerId string) *bpPeer { return peer } -// Sets the peer's blockchain height. +// Sets the peer's alleged blockchain height. func (pool *BlockPool) SetPeerHeight(peerId string, height uint) { pool.peersMtx.Lock() // Lock defer pool.peersMtx.Unlock() @@ -239,7 +269,6 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer { peer.numRequests++ return peer } - return nil } @@ -258,6 +287,7 @@ func (pool *BlockPool) nextHeight() uint { pool.requestsMtx.Lock() // Lock defer pool.requestsMtx.Unlock() + // we make one request per height. return pool.height + uint(pool.numTotal) } @@ -272,6 +302,8 @@ func (pool *BlockPool) makeRequest(height uint) { } pool.requests[height] = request + pool.peerless++ + nextHeight := pool.height + uint(pool.numTotal) if nextHeight == height { pool.numTotal++ @@ -328,7 +360,7 @@ type bpRequest struct { //------------------------------------- // Responsible for making more requests as necessary -// Returns when a block is found (e.g. AddBlock() is called) +// Returns only when a block is found (e.g. AddBlock() is called) func requestRoutine(pool *BlockPool, height uint) { for { var peer *bpPeer = nil @@ -347,15 +379,18 @@ func requestRoutine(pool *BlockPool, height uint) { break PICK_LOOP } + // set the peer, decrement peerless pool.setPeerForRequest(height, peer.id) for try := 0; try < maxTries; try++ { pool.sendRequest(height, peer.id) time.Sleep(requestTimeoutSeconds * time.Second) + // if successful the block is either in the pool, if pool.hasBlock(height) { pool.decrPeer(peer.id) return } + // or already processed and we've moved past it bpHeight, _, _ := pool.GetStatus() if height < bpHeight { pool.decrPeer(peer.id) @@ -363,6 +398,10 @@ func requestRoutine(pool *BlockPool, height uint) { } } + // unset the peer, increment peerless + pool.removePeerForRequest(height, peer.id) + + // this peer failed us, try again pool.RemovePeer(peer.id) pool.sendTimeout(peer.id) } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 9bdfe27cf..c5947b0f5 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -10,6 +10,7 @@ import ( "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" + dbm "github.com/tendermint/tendermint/db" "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" @@ -24,9 +25,14 @@ const ( // stop syncing when last block's time is // within this much of the system time. stopSyncingDurationMinutes = 10 + // ask for best height every 10s + statusUpdateIntervalSeconds = 10 + // check if we should switch to consensus reactor + switchToConsensusIntervalSeconds = 10 ) -type stateResetter interface { +type consensusReactor interface { + SetSyncing(bool) ResetToState(*sm.State) } @@ -76,8 +82,8 @@ func (bcR *BlockchainReactor) Start(sw *p2p.Switch) { if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) { log.Info("Starting BlockchainReactor") bcR.sw = sw - bcR.pool.Start() if bcR.sync { + bcR.pool.Start() go bcR.poolRoutine() } } @@ -106,7 +112,7 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { // Implements Reactor func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) { // Send peer our state. - peer.Send(BlockchainChannel, &bcPeerStatusMessage{bcR.store.Height()}) + peer.Send(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()}) } // Implements Reactor @@ -141,8 +147,14 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) case *bcBlockResponseMessage: // Got a block. bcR.pool.AddBlock(msg.Block, src.Key) - case *bcPeerStatusMessage: - // Got a peer status. + case *bcStatusRequestMessage: + // Send peer our state. + queued := src.TrySend(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()}) + if !queued { + // sorry + } + case *bcStatusResponseMessage: + // Got a peer status. Unverified. bcR.pool.SetPeerHeight(src.Key, msg.Height) default: log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) @@ -153,6 +165,8 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) func (bcR *BlockchainReactor) poolRoutine() { trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) + statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) + switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second) FOR_LOOP: for { @@ -176,6 +190,24 @@ FOR_LOOP: if peer != nil { bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) } + case _ = <-statusUpdateTicker.C: + // ask for status updates + go bcR.BroadcastStatusRequest() + case _ = <-switchToConsensusTicker.C: + // not thread safe access for peerless and numPending but should be fine + log.Debug("Consensus ticker", "peerless", bcR.pool.peerless, "pending", bcR.pool.numPending, "total", bcR.pool.numTotal) + // NOTE: this condition is very strict right now. may need to weaken + if bcR.pool.numPending == maxPendingRequests && bcR.pool.peerless == bcR.pool.numPending { + log.Warn("Time to switch to consensus reactor!", "height", bcR.pool.height) + bcR.pool.Stop() + stateDB := dbm.GetDB("state") + state := sm.LoadState(stateDB) + + bcR.sw.Reactor("CONSENSUS").(consensusReactor).ResetToState(state) + bcR.sw.Reactor("CONSENSUS").(consensusReactor).SetSyncing(false) + + break FOR_LOOP + } case _ = <-trySyncTicker.C: // chan time //var lastValidatedBlock *types.Block SYNC_LOOP: @@ -215,6 +247,7 @@ FOR_LOOP: // TODO: use other heuristics too besides blocktime. // It's not a security concern, as it only needs to happen // upon node sync, and there's also a second (slower) + // this peer failed us // method of syncing in the consensus reactor. if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute { @@ -238,8 +271,13 @@ FOR_LOOP: } } -func (bcR *BlockchainReactor) BroadcastStatus() error { - bcR.sw.Broadcast(BlockchainChannel, &bcPeerStatusMessage{bcR.store.Height()}) +func (bcR *BlockchainReactor) BroadcastStatusResponse() error { + bcR.sw.Broadcast(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()}) + return nil +} + +func (bcR *BlockchainReactor) BroadcastStatusRequest() error { + bcR.sw.Broadcast(BlockchainChannel, &bcStatusRequestMessage{bcR.store.Height()}) return nil } @@ -252,9 +290,10 @@ func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) { // Messages const ( - msgTypeBlockRequest = byte(0x10) - msgTypeBlockResponse = byte(0x11) - msgTypePeerStatus = byte(0x20) + msgTypeBlockRequest = byte(0x10) + msgTypeBlockResponse = byte(0x11) + msgTypeStatusResponse = byte(0x20) + msgTypeStatusRequest = byte(0x21) ) type BlockchainMessage interface{} @@ -263,7 +302,8 @@ var _ = binary.RegisterInterface( struct{ BlockchainMessage }{}, binary.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest}, binary.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse}, - binary.ConcreteType{&bcPeerStatusMessage{}, msgTypePeerStatus}, + binary.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse}, + binary.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest}, ) func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) { @@ -296,10 +336,20 @@ func (m *bcBlockResponseMessage) String() string { //------------------------------------- -type bcPeerStatusMessage struct { +type bcStatusRequestMessage struct { + Height uint +} + +func (m *bcStatusRequestMessage) String() string { + return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height) +} + +//------------------------------------- + +type bcStatusResponseMessage struct { Height uint } -func (m *bcPeerStatusMessage) String() string { - return fmt.Sprintf("[bcPeerStatusMessage %v]", m.Height) +func (m *bcStatusResponseMessage) String() string { + return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height) } diff --git a/consensus/reactor.go b/consensus/reactor.go index ffc1c9219..2b6af88b2 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -41,6 +41,9 @@ type ConsensusReactor struct { blockStore *bc.BlockStore conS *ConsensusState + // if fast sync is running we don't really do anything + syncing bool + evsw events.Fireable } @@ -124,7 +127,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // Implements Reactor func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) { - if !conR.IsRunning() { + if conR.syncing || !conR.IsRunning() { return } @@ -224,6 +227,11 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte } } +// Sets whether or not we're using the blockchain reactor for syncing +func (conR *ConsensusReactor) SetSyncing(syncing bool) { + conR.syncing = syncing +} + // Sets our private validator account for signing votes. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) { conR.conS.SetPrivValidator(priv) diff --git a/node/node.go b/node/node.go index cab2c397a..415796367 100644 --- a/node/node.go +++ b/node/node.go @@ -81,6 +81,11 @@ func NewNode() *Node { consensusReactor.SetPrivValidator(privValidator) } + // so the consensus reactor won't do anything until we're synced + if config.App().GetBool("FastSync") { + consensusReactor.SetSyncing(true) + } + sw := p2p.NewSwitch() sw.AddReactor("PEX", pexReactor) sw.AddReactor("MEMPOOL", mempoolReactor) @@ -112,10 +117,6 @@ func (n *Node) Start() { nodeInfo := makeNodeInfo(n.sw) n.sw.SetNodeInfo(nodeInfo) n.sw.Start() - if config.App().GetBool("FastSync") { - // TODO: When FastSync is done, start CONSENSUS. - n.sw.Reactor("CONSENSUS").Stop() - } } func (n *Node) Stop() {