|
|
- package blockchain
-
- import (
- "bytes"
- "errors"
- "fmt"
- "reflect"
- "sync/atomic"
- "time"
-
- "github.com/tendermint/tendermint/binary"
- . "github.com/tendermint/tendermint/common"
- dbm "github.com/tendermint/tendermint/db"
- "github.com/tendermint/tendermint/events"
- "github.com/tendermint/tendermint/p2p"
- sm "github.com/tendermint/tendermint/state"
- "github.com/tendermint/tendermint/types"
- )
-
- const (
- BlockchainChannel = byte(0x40)
- defaultChannelCapacity = 100
- defaultSleepIntervalMS = 500
- trySyncIntervalMS = 100
- // stop syncing when last block's time is
- // within this much of the system time.
- // stopSyncingDurationMinutes = 10
-
- // ask for best height every 10s
- statusUpdateIntervalSeconds = 10
- // check if we should switch to consensus reactor
- switchToConsensusIntervalSeconds = 10
- )
-
- type consensusReactor interface {
- SetSyncing(bool)
- ResetToState(*sm.State)
- }
-
- // BlockchainReactor handles long-term catchup syncing.
- type BlockchainReactor struct {
- sw *p2p.Switch
- state *sm.State
- store *BlockStore
- pool *BlockPool
- sync bool
- requestsCh chan BlockRequest
- timeoutsCh chan string
- lastBlock *types.Block
- quit chan struct{}
- running uint32
-
- evsw events.Fireable
- }
-
- func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor {
- if state.LastBlockHeight != store.Height() &&
- state.LastBlockHeight != store.Height()-1 { // XXX double check this logic.
- panic(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
- }
- requestsCh := make(chan BlockRequest, defaultChannelCapacity)
- timeoutsCh := make(chan string, defaultChannelCapacity)
- pool := NewBlockPool(
- store.Height()+1,
- requestsCh,
- timeoutsCh,
- )
- bcR := &BlockchainReactor{
- state: state,
- store: store,
- pool: pool,
- sync: sync,
- requestsCh: requestsCh,
- timeoutsCh: timeoutsCh,
- quit: make(chan struct{}),
- running: uint32(0),
- }
- return bcR
- }
-
- // Implements Reactor
- func (bcR *BlockchainReactor) Start(sw *p2p.Switch) {
- if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) {
- log.Info("Starting BlockchainReactor")
- bcR.sw = sw
- if bcR.sync {
- bcR.pool.Start()
- go bcR.poolRoutine()
- }
- }
- }
-
- // Implements Reactor
- func (bcR *BlockchainReactor) Stop() {
- if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) {
- log.Info("Stopping BlockchainReactor")
- close(bcR.quit)
- bcR.pool.Stop()
- }
- }
-
- // Implements Reactor
- func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
- return []*p2p.ChannelDescriptor{
- &p2p.ChannelDescriptor{
- Id: BlockchainChannel,
- Priority: 5,
- SendQueueCapacity: 100,
- },
- }
- }
-
- // Implements Reactor
- func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
- // Send peer our state.
- peer.Send(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
- }
-
- // Implements Reactor
- func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
- // Remove peer from the pool.
- bcR.pool.RemovePeer(peer.Key)
- }
-
- // Implements Reactor
- func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
- _, msg, err := DecodeMessage(msgBytes)
- if err != nil {
- log.Warn("Error decoding message", "error", err)
- return
- }
-
- log.Info("Received message", "msg", msg)
-
- switch msg := msg.(type) {
- case *bcBlockRequestMessage:
- // Got a request for a block. Respond with block if we have it.
- block := bcR.store.LoadBlock(msg.Height)
- if block != nil {
- msg := &bcBlockResponseMessage{Block: block}
- queued := src.TrySend(BlockchainChannel, msg)
- if !queued {
- // queue is full, just ignore.
- }
- } else {
- // TODO peer is asking for things we don't have.
- }
- case *bcBlockResponseMessage:
- // Got a block.
- bcR.pool.AddBlock(msg.Block, src.Key)
- case *bcStatusRequestMessage:
- // Send peer our state.
- queued := src.TrySend(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
- if !queued {
- // sorry
- }
- case *bcStatusResponseMessage:
- // Got a peer status. Unverified.
- bcR.pool.SetPeerHeight(src.Key, msg.Height)
- default:
- log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
- }
- }
-
- // Handle messages from the poolReactor telling the reactor what to do.
- func (bcR *BlockchainReactor) poolRoutine() {
-
- trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
- statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
- switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
-
- FOR_LOOP:
- for {
- select {
- case request := <-bcR.requestsCh: // chan BlockRequest
- peer := bcR.sw.Peers().Get(request.PeerId)
- if peer == nil {
- // We can't fulfill the request.
- continue FOR_LOOP
- }
- msg := &bcBlockRequestMessage{request.Height}
- queued := peer.TrySend(BlockchainChannel, msg)
- if !queued {
- // We couldn't queue the request.
- time.Sleep(defaultSleepIntervalMS * time.Millisecond)
- continue FOR_LOOP
- }
- case peerId := <-bcR.timeoutsCh: // chan string
- // Peer timed out.
- peer := bcR.sw.Peers().Get(peerId)
- if peer != nil {
- bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
- }
- case _ = <-statusUpdateTicker.C:
- // ask for status updates
- go bcR.BroadcastStatusRequest()
- case _ = <-switchToConsensusTicker.C:
- // not thread safe access for peerless and numPending but should be fine
- log.Debug("Consensus ticker", "peerless", bcR.pool.peerless, "pending", bcR.pool.numPending, "total", bcR.pool.numTotal)
- // NOTE: this condition is very strict right now. may need to weaken
- // if the max amount of requests are pending and peerless
- // and we have some peers (say > 5), then we're caught up
- maxPending := bcR.pool.numPending == maxPendingRequests
- maxPeerless := bcR.pool.peerless == bcR.pool.numPending
- o, i, _ := bcR.sw.NumPeers()
- enoughPeers := o+i >= 5
- if maxPending && maxPeerless && enoughPeers {
- log.Warn("Time to switch to consensus reactor!", "height", bcR.pool.height)
- bcR.pool.Stop()
- stateDB := dbm.GetDB("state")
- state := sm.LoadState(stateDB)
-
- bcR.sw.Reactor("CONSENSUS").(consensusReactor).ResetToState(state)
- bcR.sw.Reactor("CONSENSUS").(consensusReactor).SetSyncing(false)
-
- break FOR_LOOP
- }
- case _ = <-trySyncTicker.C: // chan time
- //var lastValidatedBlock *types.Block
- SYNC_LOOP:
- for i := 0; i < 10; i++ {
- // See if there are any blocks to sync.
- first, second := bcR.pool.PeekTwoBlocks()
- //log.Debug("TrySync peeked", "first", first, "second", second)
- if first == nil || second == nil {
- // We need both to sync the first block.
- break SYNC_LOOP
- }
- firstParts := first.MakePartSet()
- firstPartsHeader := firstParts.Header()
- // Finally, verify the first block using the second's validation.
- err := bcR.state.BondedValidators.VerifyValidation(
- first.Hash(), firstPartsHeader, first.Height, second.Validation)
- if err != nil {
- log.Debug("error in validation", "error", err)
- bcR.pool.RedoRequest(first.Height)
- break SYNC_LOOP
- } else {
- bcR.pool.PopRequest()
- err := sm.ExecBlock(bcR.state, first, firstPartsHeader)
- if err != nil {
- // TODO This is bad, are we zombie?
- panic(Fmt("Failed to process committed block: %v", err))
- }
- bcR.store.SaveBlock(first, firstParts, second.Validation)
- bcR.state.Save()
- //lastValidatedBlock = first
- }
- }
- /*
- // We're done syncing for now (will do again shortly)
- // See if we want to stop syncing and turn on the
- // consensus reactor.
- // TODO: use other heuristics too besides blocktime.
- // It's not a security concern, as it only needs to happen
- // upon node sync, and there's also a second (slower)
- // this peer failed us
- // method of syncing in the consensus reactor.
-
- if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute {
- go func() {
- log.Info("Stopping blockpool syncing, turning on consensus...")
- trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others.
- conR := bcR.sw.Reactor("CONSENSUS")
- conR.(stateResetter).ResetToState(bcR.state)
- conR.Start(bcR.sw)
- for _, peer := range bcR.sw.Peers().List() {
- conR.AddPeer(peer)
- }
- }()
- break FOR_LOOP
- }
- */
- continue FOR_LOOP
- case <-bcR.quit:
- break FOR_LOOP
- }
- }
- }
-
- func (bcR *BlockchainReactor) BroadcastStatusResponse() error {
- bcR.sw.Broadcast(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
- return nil
- }
-
- func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
- bcR.sw.Broadcast(BlockchainChannel, &bcStatusRequestMessage{bcR.store.Height()})
- return nil
- }
-
- // implements events.Eventable
- func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) {
- bcR.evsw = evsw
- }
-
- //-----------------------------------------------------------------------------
- // Messages
-
- const (
- msgTypeBlockRequest = byte(0x10)
- msgTypeBlockResponse = byte(0x11)
- msgTypeStatusResponse = byte(0x20)
- msgTypeStatusRequest = byte(0x21)
- )
-
- type BlockchainMessage interface{}
-
- var _ = binary.RegisterInterface(
- struct{ BlockchainMessage }{},
- binary.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
- binary.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
- binary.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
- binary.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
- )
-
- func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
- msgType = bz[0]
- n := new(int64)
- r := bytes.NewReader(bz)
- msg = binary.ReadBinary(struct{ BlockchainMessage }{}, r, n, &err).(struct{ BlockchainMessage }).BlockchainMessage
- return
- }
-
- //-------------------------------------
-
- type bcBlockRequestMessage struct {
- Height uint
- }
-
- func (m *bcBlockRequestMessage) String() string {
- return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
- }
-
- //-------------------------------------
-
- type bcBlockResponseMessage struct {
- Block *types.Block
- }
-
- func (m *bcBlockResponseMessage) String() string {
- return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
- }
-
- //-------------------------------------
-
- type bcStatusRequestMessage struct {
- Height uint
- }
-
- func (m *bcStatusRequestMessage) String() string {
- return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
- }
-
- //-------------------------------------
-
- type bcStatusResponseMessage struct {
- Height uint
- }
-
- func (m *bcStatusResponseMessage) String() string {
- return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
- }
|