Browse Source

Merge pull request #80 from tendermint/blockchain

Blockchain
pull/77/merge
Jae Kwon 10 years ago
parent
commit
4ae29e0ae5
5 changed files with 66 additions and 110 deletions
  1. +29
    -46
      blockchain/pool.go
  2. +22
    -45
      blockchain/reactor.go
  3. +2
    -2
      config/tendermint_test/config.go
  4. +12
    -11
      consensus/reactor.go
  5. +1
    -6
      node/node.go

+ 29
- 46
blockchain/pool.go View File

@ -18,15 +18,13 @@ const (
maxRequestsPerPeer = 300
)
// numTotal = numPending + blocks in the pool we havnt synced yet
var (
requestTimeoutSeconds = time.Duration(3)
)
/*
Peers self report their heights when a new peer joins the block pool.
Starting from whatever we've got (pool.height), we request blocks
Starting from pool.height (inclusive), we request blocks
in sequence from peers that reported higher heights than ours.
Every so often we ask peers what height they're on so we can keep going.
@ -37,12 +35,11 @@ var (
type BlockPool struct {
// block requests
requestsMtx sync.Mutex
requests map[uint]*bpRequest
peerless int32 // number of requests without peers
height uint // the lowest key in requests.
numPending int32
numTotal int32
requestsMtx sync.Mutex
requests map[uint]*bpRequest
height uint // the lowest key in requests.
numUnassigned int32 // number of requests not yet assigned to a peer
numPending int32 // number of requests pending assignment or block response
// peers
peersMtx sync.Mutex
@ -59,10 +56,10 @@ func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<-
return &BlockPool{
peers: make(map[string]*bpPeer),
requests: make(map[uint]*bpRequest),
height: start,
numPending: 0,
numTotal: 0,
requests: make(map[uint]*bpRequest),
height: start,
numUnassigned: 0,
numPending: 0,
requestsCh: requestsCh,
timeoutsCh: timeoutsCh,
@ -97,26 +94,25 @@ RUN_LOOP:
if atomic.LoadInt32(&pool.running) == 0 {
break RUN_LOOP
}
_, numPending, numTotal := pool.GetStatus()
_, numPending := pool.GetStatus()
if numPending >= maxPendingRequests {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
} else if numTotal >= maxTotalRequests {
} else if len(pool.requests) >= maxTotalRequests {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
} else {
// request for more blocks.
height := pool.nextHeight()
pool.makeRequest(height)
pool.makeNextRequest()
}
}
}
func (pool *BlockPool) GetStatus() (uint, int32, int32) {
func (pool *BlockPool) GetStatus() (uint, int32) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
return pool.height, pool.numPending, pool.numTotal
return pool.height, pool.numPending
}
// We need to see the second block's Validation to validate the first block.
@ -146,7 +142,6 @@ func (pool *BlockPool) PopRequest() {
delete(pool.requests, pool.height)
pool.height++
pool.numTotal--
}
// Invalidates the block at pool.height.
@ -165,7 +160,7 @@ func (pool *BlockPool) RedoRequest(height uint) {
request.block = nil
request.peerId = ""
pool.numPending++
pool.peerless++
pool.numUnassigned++
go requestRoutine(pool, height)
}
@ -186,7 +181,7 @@ func (pool *BlockPool) setPeerForRequest(height uint, peerId string) {
if request == nil {
return
}
pool.peerless--
pool.numUnassigned--
request.peerId = peerId
}
@ -198,7 +193,7 @@ func (pool *BlockPool) removePeerForRequest(height uint, peerId string) {
if request == nil {
return
}
pool.peerless++
pool.numUnassigned++
request.peerId = ""
}
@ -283,34 +278,22 @@ func (pool *BlockPool) decrPeer(peerId string) {
peer.numRequests--
}
func (pool *BlockPool) nextHeight() uint {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
// we make one request per height.
return pool.height + uint(pool.numTotal)
}
func (pool *BlockPool) makeRequest(height uint) {
func (pool *BlockPool) makeNextRequest() {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
nextHeight := pool.height + uint(len(pool.requests))
request := &bpRequest{
height: height,
height: nextHeight,
peerId: "",
block: nil,
}
pool.requests[height] = request
pool.peerless++
nextHeight := pool.height + uint(pool.numTotal)
if nextHeight == height {
pool.numTotal++
pool.numPending++
}
pool.requests[nextHeight] = request
pool.numUnassigned++
pool.numPending++
go requestRoutine(pool, height)
go requestRoutine(pool, nextHeight)
}
func (pool *BlockPool) sendRequest(height uint, peerId string) {
@ -332,7 +315,7 @@ func (pool *BlockPool) debug() string {
defer pool.requestsMtx.Unlock()
str := ""
for h := pool.height; h < pool.height+uint(pool.numTotal); h++ {
for h := pool.height; h < pool.height+uint(len(pool.requests)); h++ {
if pool.requests[h] == nil {
str += Fmt("H(%v):X ", h)
} else {
@ -379,7 +362,7 @@ func requestRoutine(pool *BlockPool, height uint) {
break PICK_LOOP
}
// set the peer, decrement peerless
// set the peer, decrement numUnassigned
pool.setPeerForRequest(height, peer.id)
for try := 0; try < maxTries; try++ {
@ -391,14 +374,14 @@ func requestRoutine(pool *BlockPool, height uint) {
return
}
// or already processed and we've moved past it
bpHeight, _, _ := pool.GetStatus()
bpHeight, _ := pool.GetStatus()
if height < bpHeight {
pool.decrPeer(peer.id)
return
}
}
// unset the peer, increment peerless
// unset the peer, increment numUnassigned
pool.removePeerForRequest(height, peer.id)
// this peer failed us, try again


+ 22
- 45
blockchain/reactor.go View File

@ -10,7 +10,6 @@ import (
"github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
dbm "github.com/tendermint/tendermint/db"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
@ -33,8 +32,9 @@ const (
)
type consensusReactor interface {
SetSyncing(bool)
ResetToState(*sm.State)
// for when we switch from blockchain reactor and fast sync to
// the consensus machine
SwitchToConsensus(*sm.State)
}
// BlockchainReactor handles long-term catchup syncing.
@ -163,6 +163,8 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte)
}
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
// (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
func (bcR *BlockchainReactor) poolRoutine() {
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
@ -175,14 +177,14 @@ FOR_LOOP:
case request := <-bcR.requestsCh: // chan BlockRequest
peer := bcR.sw.Peers().Get(request.PeerId)
if peer == nil {
// We can't fulfill the request.
// We can't assign the request.
continue FOR_LOOP
}
msg := &bcBlockRequestMessage{request.Height}
queued := peer.TrySend(BlockchainChannel, msg)
if !queued {
// We couldn't queue the request.
time.Sleep(defaultSleepIntervalMS * time.Millisecond)
// We couldn't make the request, send-queue full.
// The pool handles retries, so just let it go.
continue FOR_LOOP
}
case peerId := <-bcR.timeoutsCh: // chan string
@ -195,28 +197,28 @@ FOR_LOOP:
// ask for status updates
go bcR.BroadcastStatusRequest()
case _ = <-switchToConsensusTicker.C:
// not thread safe access for peerless and numPending but should be fine
log.Debug("Consensus ticker", "peerless", bcR.pool.peerless, "pending", bcR.pool.numPending, "total", bcR.pool.numTotal)
// not thread safe access for numUnassigned and numPending but should be fine
// TODO make threadsafe and use exposed functions
outbound, inbound, _ := bcR.sw.NumPeers()
log.Debug("Consensus ticker", "numUnassigned", bcR.pool.numUnassigned, "numPending", bcR.pool.numPending,
"total", len(bcR.pool.requests), "outbound", outbound, "inbound", inbound)
// NOTE: this condition is very strict right now. may need to weaken
// if the max amount of requests are pending and peerless
// and we have some peers (say > 5), then we're caught up
// If all `maxPendingRequests` requests are unassigned
// and we have some peers (say >= 3), then we're caught up
maxPending := bcR.pool.numPending == maxPendingRequests
maxPeerless := bcR.pool.peerless == bcR.pool.numPending
o, i, _ := bcR.sw.NumPeers()
enoughPeers := o+i >= 5
if maxPending && maxPeerless && enoughPeers {
log.Warn("Time to switch to consensus reactor!", "height", bcR.pool.height)
allUnassigned := bcR.pool.numPending == bcR.pool.numUnassigned
enoughPeers := outbound+inbound >= 3
if maxPending && allUnassigned && enoughPeers {
log.Info("Time to switch to consensus reactor!", "height", bcR.pool.height)
bcR.pool.Stop()
stateDB := dbm.GetDB("state")
state := sm.LoadState(stateDB)
bcR.sw.Reactor("CONSENSUS").(consensusReactor).ResetToState(state)
bcR.sw.Reactor("CONSENSUS").(consensusReactor).SetSyncing(false)
conR := bcR.sw.Reactor("CONSENSUS").(consensusReactor)
conR.SwitchToConsensus(bcR.state)
break FOR_LOOP
}
case _ = <-trySyncTicker.C: // chan time
//var lastValidatedBlock *types.Block
// This loop can be slow as long as it's doing syncing work.
SYNC_LOOP:
for i := 0; i < 10; i++ {
// See if there are any blocks to sync.
@ -244,33 +246,8 @@ FOR_LOOP:
}
bcR.store.SaveBlock(first, firstParts, second.Validation)
bcR.state.Save()
//lastValidatedBlock = first
}
}
/*
// We're done syncing for now (will do again shortly)
// See if we want to stop syncing and turn on the
// consensus reactor.
// TODO: use other heuristics too besides blocktime.
// It's not a security concern, as it only needs to happen
// upon node sync, and there's also a second (slower)
// this peer failed us
// method of syncing in the consensus reactor.
if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute {
go func() {
log.Info("Stopping blockpool syncing, turning on consensus...")
trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others.
conR := bcR.sw.Reactor("CONSENSUS")
conR.(stateResetter).ResetToState(bcR.state)
conR.Start(bcR.sw)
for _, peer := range bcR.sw.Peers().List() {
conR.AddPeer(peer)
}
}()
break FOR_LOOP
}
*/
continue FOR_LOOP
case <-bcR.quit:
break FOR_LOOP


+ 2
- 2
config/tendermint_test/config.go View File

@ -71,7 +71,7 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("genesis_file", rootDir+"/genesis.json")
mapConfig.SetDefault("moniker", "anonymous")
mapConfig.SetDefault("node_laddr", "0.0.0.0:36656")
mapConfig.SetDefault("fast_sync", true)
mapConfig.SetDefault("fast_sync", false)
mapConfig.SetDefault("addrbook_file", rootDir+"/addrbook.json")
mapConfig.SetDefault("priv_validator_file", rootDir+"/priv_validator.json")
mapConfig.SetDefault("db_backend", "memdb")
@ -94,7 +94,7 @@ network = "tendermint_test"
moniker = "__MONIKER__"
node_laddr = "0.0.0.0:36656"
seeds = ""
fast_sync = true
fast_sync = false
db_backend = "memdb"
log_level = "debug"
rpc_laddr = "0.0.0.0:36657"


+ 12
- 11
consensus/reactor.go View File

@ -42,16 +42,17 @@ type ConsensusReactor struct {
conS *ConsensusState
// if fast sync is running we don't really do anything
syncing bool
sync bool
evsw events.Fireable
}
func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor {
func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, sync bool) *ConsensusReactor {
conR := &ConsensusReactor{
blockStore: blockStore,
quit: make(chan struct{}),
conS: consensusState,
sync: sync,
}
return conR
}
@ -61,7 +62,9 @@ func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
log.Info("Starting ConsensusReactor")
conR.sw = sw
conR.conS.Start()
if !conR.sync {
conR.conS.Start()
}
go conR.broadcastNewRoundStepRoutine()
}
}
@ -129,7 +132,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
// Implements Reactor
func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
if conR.syncing || !conR.IsRunning() {
if conR.sync || !conR.IsRunning() {
return
}
@ -235,20 +238,18 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
}
}
// Sets whether or not we're using the blockchain reactor for syncing
func (conR *ConsensusReactor) SetSyncing(syncing bool) {
conR.syncing = syncing
}
// Sets our private validator account for signing votes.
func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
conR.conS.SetPrivValidator(priv)
}
// Reset to some state.
func (conR *ConsensusReactor) ResetToState(state *sm.State) {
// Switch from the fast sync to the consensus:
// reset the state, turn off fast sync, start the consensus-state-machine
func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
conR.conS.updateToState(state, false)
conR.conS.newStepCh <- conR.conS.getRoundState()
conR.sync = false
conR.conS.Start()
}
// implements events.Eventable


+ 1
- 6
node/node.go View File

@ -87,16 +87,11 @@ func NewNode() *Node {
// Get ConsensusReactor
consensusState := consensus.NewConsensusState(state, blockStore, mempoolReactor)
consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore)
consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, config.GetBool("fast_sync"))
if privValidator != nil {
consensusReactor.SetPrivValidator(privValidator)
}
// so the consensus reactor won't do anything until we're synced
if config.GetBool("fast_sync") {
consensusReactor.SetSyncing(true)
}
sw := p2p.NewSwitch()
sw.AddReactor("PEX", pexReactor)
sw.AddReactor("MEMPOOL", mempoolReactor)


Loading…
Cancel
Save