  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync/atomic"
  8. "time"
  9. ""
  10. . ""
  11. dbm ""
  12. ""
  13. ""
  14. sm ""
  15. ""
  16. )
  17. const (
  18. BlockchainChannel = byte(0x40)
  19. defaultChannelCapacity = 100
  20. defaultSleepIntervalMS = 500
  21. trySyncIntervalMS = 100
  22. // stop syncing when last block's time is
  23. // within this much of the system time.
  24. // stopSyncingDurationMinutes = 10
  25. // ask for best height every 10s
  26. statusUpdateIntervalSeconds = 10
  27. // check if we should switch to consensus reactor
  28. switchToConsensusIntervalSeconds = 10
  29. )
  30. type consensusReactor interface {
  31. SetSyncing(bool)
  32. ResetToState(*sm.State)
  33. }
  34. // BlockchainReactor handles long-term catchup syncing.
  35. type BlockchainReactor struct {
  36. sw *p2p.Switch
  37. state *sm.State
  38. store *BlockStore
  39. pool *BlockPool
  40. sync bool
  41. requestsCh chan BlockRequest
  42. timeoutsCh chan string
  43. lastBlock *types.Block
  44. quit chan struct{}
  45. running uint32
  46. evsw events.Fireable
  47. }
  48. func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor {
  49. if state.LastBlockHeight != store.Height() &&
  50. state.LastBlockHeight != store.Height()-1 { // XXX double check this logic.
  51. panic(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  52. }
  53. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  54. timeoutsCh := make(chan string, defaultChannelCapacity)
  55. pool := NewBlockPool(
  56. store.Height()+1,
  57. requestsCh,
  58. timeoutsCh,
  59. )
  60. bcR := &BlockchainReactor{
  61. state: state,
  62. store: store,
  63. pool: pool,
  64. sync: sync,
  65. requestsCh: requestsCh,
  66. timeoutsCh: timeoutsCh,
  67. quit: make(chan struct{}),
  68. running: uint32(0),
  69. }
  70. return bcR
  71. }
  72. // Implements Reactor
  73. func (bcR *BlockchainReactor) Start(sw *p2p.Switch) {
  74. if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) {
  75. log.Info("Starting BlockchainReactor")
  76. bcR.sw = sw
  77. if bcR.sync {
  78. bcR.pool.Start()
  79. go bcR.poolRoutine()
  80. }
  81. }
  82. }
  83. // Implements Reactor
  84. func (bcR *BlockchainReactor) Stop() {
  85. if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) {
  86. log.Info("Stopping BlockchainReactor")
  87. close(bcR.quit)
  88. bcR.pool.Stop()
  89. }
  90. }
  91. // Implements Reactor
  92. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  93. return []*p2p.ChannelDescriptor{
  94. &p2p.ChannelDescriptor{
  95. Id: BlockchainChannel,
  96. Priority: 5,
  97. SendQueueCapacity: 100,
  98. },
  99. }
  100. }
  101. // Implements Reactor
  102. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
  103. // Send peer our state.
  104. peer.Send(BlockchainChannel, &bcStatusResponseMessage{})
  105. }
  106. // Implements Reactor
  107. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  108. // Remove peer from the pool.
  109. bcR.pool.RemovePeer(peer.Key)
  110. }
  111. // Implements Reactor
  112. func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
  113. _, msg, err := DecodeMessage(msgBytes)
  114. if err != nil {
  115. log.Warn("Error decoding message", "error", err)
  116. return
  117. }
  118. log.Info("Received message", "msg", msg)
  119. switch msg := msg.(type) {
  120. case *bcBlockRequestMessage:
  121. // Got a request for a block. Respond with block if we have it.
  122. block :=
  123. if block != nil {
  124. msg := &bcBlockResponseMessage{Block: block}
  125. queued := src.TrySend(BlockchainChannel, msg)
  126. if !queued {
  127. // queue is full, just ignore.
  128. }
  129. } else {
  130. // TODO peer is asking for things we don't have.
  131. }
  132. case *bcBlockResponseMessage:
  133. // Got a block.
  134. bcR.pool.AddBlock(msg.Block, src.Key)
  135. case *bcStatusRequestMessage:
  136. // Send peer our state.
  137. queued := src.TrySend(BlockchainChannel, &bcStatusResponseMessage{})
  138. if !queued {
  139. // sorry
  140. }
  141. case *bcStatusResponseMessage:
  142. // Got a peer status. Unverified.
  143. bcR.pool.SetPeerHeight(src.Key, msg.Height)
  144. default:
  145. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  146. }
  147. }
  148. // Handle messages from the poolReactor telling the reactor what to do.
  149. func (bcR *BlockchainReactor) poolRoutine() {
  150. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  151. statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
  152. switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
  153. FOR_LOOP:
  154. for {
  155. select {
  156. case request := <-bcR.requestsCh: // chan BlockRequest
  157. peer := bcR.sw.Peers().Get(request.PeerId)
  158. if peer == nil {
  159. // We can't fulfill the request.
  160. continue FOR_LOOP
  161. }
  162. msg := &bcBlockRequestMessage{request.Height}
  163. queued := peer.TrySend(BlockchainChannel, msg)
  164. if !queued {
  165. // We couldn't queue the request.
  166. time.Sleep(defaultSleepIntervalMS * time.Millisecond)
  167. continue FOR_LOOP
  168. }
  169. case peerId := <-bcR.timeoutsCh: // chan string
  170. // Peer timed out.
  171. peer := bcR.sw.Peers().Get(peerId)
  172. if peer != nil {
  173. bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  174. }
  175. case _ = <-statusUpdateTicker.C:
  176. // ask for status updates
  177. go bcR.BroadcastStatusRequest()
  178. case _ = <-switchToConsensusTicker.C:
  179. // not thread safe access for peerless and numPending but should be fine
  180. log.Debug("Consensus ticker", "peerless", bcR.pool.peerless, "pending", bcR.pool.numPending, "total", bcR.pool.numTotal)
  181. // NOTE: this condition is very strict right now. may need to weaken
  182. // if the max amount of requests are pending and peerless
  183. // and we have some peers (say > 5), then we're caught up
  184. maxPending := bcR.pool.numPending == maxPendingRequests
  185. maxPeerless := bcR.pool.peerless == bcR.pool.numPending
  186. o, i, _ := bcR.sw.NumPeers()
  187. enoughPeers := o+i > 5
  188. if maxPending && maxPeerless && enoughPeers {
  189. log.Warn("Time to switch to consensus reactor!", "height", bcR.pool.height)
  190. bcR.pool.Stop()
  191. stateDB := dbm.GetDB("state")
  192. state := sm.LoadState(stateDB)
  193. bcR.sw.Reactor("CONSENSUS").(consensusReactor).ResetToState(state)
  194. bcR.sw.Reactor("CONSENSUS").(consensusReactor).SetSyncing(false)
  195. break FOR_LOOP
  196. }
  197. case _ = <-trySyncTicker.C: // chan time
  198. //var lastValidatedBlock *types.Block
  199. SYNC_LOOP:
  200. for i := 0; i < 10; i++ {
  201. // See if there are any blocks to sync.
  202. first, second := bcR.pool.PeekTwoBlocks()
  203. //log.Debug("TrySync peeked", "first", first, "second", second)
  204. if first == nil || second == nil {
  205. // We need both to sync the first block.
  206. break SYNC_LOOP
  207. }
  208. firstParts := first.MakePartSet()
  209. firstPartsHeader := firstParts.Header()
  210. // Finally, verify the first block using the second's validation.
  211. err := bcR.state.BondedValidators.VerifyValidation(
  212. first.Hash(), firstPartsHeader, first.Height, second.Validation)
  213. if err != nil {
  214. log.Debug("error in validation", "error", err)
  215. bcR.pool.RedoRequest(first.Height)
  216. break SYNC_LOOP
  217. } else {
  218. bcR.pool.PopRequest()
  219. err := sm.ExecBlock(bcR.state, first, firstPartsHeader)
  220. if err != nil {
  221. // TODO This is bad, are we zombie?
  222. panic(Fmt("Failed to process committed block: %v", err))
  223. }
  224., firstParts, second.Validation)
  225. bcR.state.Save()
  226. //lastValidatedBlock = first
  227. }
  228. }
  229. /*
  230. // We're done syncing for now (will do again shortly)
  231. // See if we want to stop syncing and turn on the
  232. // consensus reactor.
  233. // TODO: use other heuristics too besides blocktime.
  234. // It's not a security concern, as it only needs to happen
  235. // upon node sync, and there's also a second (slower)
  236. // this peer failed us
  237. // method of syncing in the consensus reactor.
  238. if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute {
  239. go func() {
  240. log.Info("Stopping blockpool syncing, turning on consensus...")
  241. trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others.
  242. conR := bcR.sw.Reactor("CONSENSUS")
  243. conR.(stateResetter).ResetToState(bcR.state)
  244. conR.Start(bcR.sw)
  245. for _, peer := range bcR.sw.Peers().List() {
  246. conR.AddPeer(peer)
  247. }
  248. }()
  249. break FOR_LOOP
  250. }
  251. */
  252. continue FOR_LOOP
  253. case <-bcR.quit:
  254. break FOR_LOOP
  255. }
  256. }
  257. }
  258. func (bcR *BlockchainReactor) BroadcastStatusResponse() error {
  259. bcR.sw.Broadcast(BlockchainChannel, &bcStatusResponseMessage{})
  260. return nil
  261. }
  262. func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
  263. bcR.sw.Broadcast(BlockchainChannel, &bcStatusRequestMessage{})
  264. return nil
  265. }
  266. // implements events.Eventable
  267. func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) {
  268. bcR.evsw = evsw
  269. }
  270. //-----------------------------------------------------------------------------
  271. // Messages
  272. const (
  273. msgTypeBlockRequest = byte(0x10)
  274. msgTypeBlockResponse = byte(0x11)
  275. msgTypeStatusResponse = byte(0x20)
  276. msgTypeStatusRequest = byte(0x21)
  277. )
  278. type BlockchainMessage interface{}
  279. var _ = binary.RegisterInterface(
  280. struct{ BlockchainMessage }{},
  281. binary.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
  282. binary.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
  283. binary.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
  284. binary.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
  285. )
  286. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
  287. msgType = bz[0]
  288. n := new(int64)
  289. r := bytes.NewReader(bz)
  290. msg = binary.ReadBinary(struct{ BlockchainMessage }{}, r, n, &err).(struct{ BlockchainMessage }).BlockchainMessage
  291. return
  292. }
  293. //-------------------------------------
  294. type bcBlockRequestMessage struct {
  295. Height uint
  296. }
  297. func (m *bcBlockRequestMessage) String() string {
  298. return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
  299. }
  300. //-------------------------------------
  301. type bcBlockResponseMessage struct {
  302. Block *types.Block
  303. }
  304. func (m *bcBlockResponseMessage) String() string {
  305. return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
  306. }
  307. //-------------------------------------
  308. type bcStatusRequestMessage struct {
  309. Height uint
  310. }
  311. func (m *bcStatusRequestMessage) String() string {
  312. return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
  313. }
  314. //-------------------------------------
  315. type bcStatusResponseMessage struct {
  316. Height uint
  317. }
  318. func (m *bcStatusResponseMessage) String() string {
  319. return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
  320. }