diff --git a/docs/spec/reactors/block_sync/img/bc-reactor.png b/docs/spec/reactors/block_sync/img/bc-reactor.png new file mode 100644 index 000000000..f7fe0f819 Binary files /dev/null and b/docs/spec/reactors/block_sync/img/bc-reactor.png differ diff --git a/docs/spec/reactors/block_sync/reactor.md b/docs/spec/reactors/block_sync/reactor.md index 9a814bead..97104eeeb 100644 --- a/docs/spec/reactors/block_sync/reactor.md +++ b/docs/spec/reactors/block_sync/reactor.md @@ -44,10 +44,258 @@ type bcStatusResponseMessage struct { } ``` -## Protocol +## Architecture and algorithm -TODO +The Blockchain reactor is organised as a set of concurrent tasks: + - Receive routine of Blockchain Reactor + - Task for creating Requesters + - Set of Requesters tasks and + - Controller task. +![Blockchain Reactor Architecture Diagram](img/bc-reactor.png) + +### Data structures + +These are the core data structures necessarily to provide the Blockchain Reactor logic. + +Requester data structure is used to track assignment of request for `block` at position `height` to a +peer with id equals to `peerID`. + +```go +type Requester { + mtx Mutex + block Block + height int64 + 
 peerID p2p.ID + redoChannel chan struct{} +} +``` +Pool is core data structure that stores last executed block (`height`), assignment of requests to peers (`requesters`), +current height for each peer and number of pending requests for each peer (`peers`), maximum peer height, etc. + +```go +type Pool { + mtx Mutex + requesters map[int64]*Requester + 
height int64 + peers map[p2p.ID]*Peer + 
maxPeerHeight int64 

 + 
numPending int32 + store BlockStore + 
requestsChannel chan<- BlockRequest + 
errorsChannel chan<- peerError +} +``` + +Peer data structure stores for each peer current `height` and number of pending requests sent to +the peer (`numPending`), etc. + +```go +type Peer struct { + id p2p.ID + height int64 + numPending int32 + timeout *time.Timer + didTimeout bool +} +``` + +BlockRequest is internal data structure used to denote current mapping of request for a block at some `height` to +a peer (`PeerID`). + +```go +type BlockRequest { + Height int64 + PeerID p2p.ID +} +``` + +### Receive routine of Blockchain Reactor + +It is executed upon message reception on the BlockchainChannel inside p2p receive routine. There is a separate p2p +receive routine (and therefore receive routine of the Blockchain Reactor) executed for each peer. Note that +try to send will not block (returns immediately) if outgoing buffer is full. + +```go +handleMsg(pool, m): + upon receiving bcBlockRequestMessage m from peer p: + block = load block for height m.Height from pool.store + if block != nil then + try to send BlockResponseMessage(block) to p + else + try to send bcNoBlockResponseMessage(m.Height) to p + + upon receiving bcBlockResponseMessage m from peer p: + pool.mtx.Lock() + requester = pool.requesters[m.Height] + if requester == nil then + error("peer sent us a block we didn't expect") + continue + + if requester.block == nil and requester.peerID == p then + requester.block = m + pool.numPending -= 1 // atomic decrement + peer = pool.peers[p] + if peer != nil then + peer.numPending-- + if peer.numPending == 0 then + peer.timeout.Stop() + // NOTE: we don't send Quit signal to the corresponding requester task! + else + trigger peer timeout to expire after peerTimeout + pool.mtx.Unlock() + + + upon receiving bcStatusRequestMessage m from peer p: + try to send bcStatusResponseMessage(pool.store.Height) + + upon receiving bcStatusResponseMessage m from peer p: + pool.mtx.Lock() + peer = pool.peers[p] + if peer != nil then + peer.height = m.height + else + peer = create new Peer data structure with id = p and height = m.Height + pool.peers[p] = peer + + if m.Height > pool.maxPeerHeight then + pool.maxPeerHeight = m.Height + pool.mtx.Unlock() + +onTimeout(p): + send error message to pool error channel + peer = pool.peers[p] + peer.didTimeout = true +``` + +### Requester tasks + +Requester task is responsible for fetching a single block at position `height`. + +```go +fetchBlock(height, pool): + while true do + peerID = nil + block = nil + peer = pickAvailablePeer(height) + peerId = peer.id + + enqueue BlockRequest(height, peerID) to pool.requestsChannel + redo = false + while !redo do + select { + upon receiving Quit message do + return + upon receiving message on redoChannel do + mtx.Lock() + pool.numPending++ + redo = true + mtx.UnLock() + } + +pickAvailablePeer(height): + selectedPeer = nil + while selectedPeer = nil do + pool.mtx.Lock() + for each peer in pool.peers do + if !peer.didTimeout and peer.numPending < maxPendingRequestsPerPeer and peer.height >= height then + peer.numPending++ + selectedPeer = peer + break + pool.mtx.Unlock() + + if selectedPeer = nil then + sleep requestIntervalMS + + return selectedPeer +``` +sleep for requestIntervalMS +### Task for creating Requesters + +This task is responsible for continuously creating and starting Requester tasks. +```go +createRequesters(pool): + while true do + if !pool.isRunning then break + if pool.numPending < maxPendingRequests or size(pool.requesters) < maxTotalRequesters then + pool.mtx.Lock() + nextHeight = pool.height + size(pool.requesters) + requester = create new requester for height nextHeight + pool.requesters[nextHeight] = requester + pool.numPending += 1 // atomic increment + start requester task + pool.mtx.Unlock() + else + sleep requestIntervalMS + pool.mtx.Lock() + for each peer in pool.peers do + if !peer.didTimeout && peer.numPending > 0 && peer.curRate < minRecvRate then + send error on pool error channel + peer.didTimeout = true + if peer.didTimeout then + for each requester in pool.requesters do + if requester.getPeerID() == peer then + enqueue msg on requestor's redoChannel + delete(pool.peers, peerID) + pool.mtx.Unlock() +``` + + +### Main blockchain reactor controller task +```go +main(pool): + create trySyncTicker with interval trySyncIntervalMS + create statusUpdateTicker with interval statusUpdateIntervalSeconds + create switchToConsensusTicker with interbal switchToConsensusIntervalSeconds + + while true do + select { + upon receiving BlockRequest(Height, Peer) on pool.requestsChannel: + try to send bcBlockRequestMessage(Height) to Peer + + upon receiving error(peer) on errorsChannel: + stop peer for error + + upon receiving message on statusUpdateTickerChannel: + broadcast bcStatusRequestMessage(bcR.store.Height) // message sent in a separate routine + + upon receiving message on switchToConsensusTickerChannel: + pool.mtx.Lock() + receivedBlockOrTimedOut = pool.height > 0 || (time.Now() - pool.startTime) > 5 Seconds + ourChainIsLongestAmongPeers = pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight + haveSomePeers = size of pool.peers > 0 + pool.mtx.Unlock() + if haveSomePeers && receivedBlockOrTimedOut && ourChainIsLongestAmongPeers then + switch to consensus mode + + upon receiving message on trySyncTickerChannel: + for i = 0; i < 10; i++ do + pool.mtx.Lock() + firstBlock = pool.requesters[pool.height].block + secondBlock = pool.requesters[pool.height].block + if firstBlock == nil or secondBlock == nil then continue + pool.mtx.Unlock() + verify firstBlock using LastCommit from secondBlock + if verification failed + pool.mtx.Lock() + peerID = pool.requesters[pool.height].peerID + redoRequestsForPeer(peerId) + delete(pool.peers, peerID) + stop peer peerID for error + pool.mtx.Unlock() + else + delete(pool.requesters, pool.height) + save firstBlock to store + pool.height++ + execute firstBlock + } + +redoRequestsForPeer(pool, peerId): + for each requester in pool.requesters do + if requester.getPeerID() == peerID + enqueue msg on redoChannel for requester +``` + ## Channels Defines `maxMsgSize` for the maximum size of incoming messages,