You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

362 lines
10 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync/atomic"
  8. "time"
  9. "github.com/tendermint/tendermint/binary"
  10. . "github.com/tendermint/tendermint/common"
  11. dbm "github.com/tendermint/tendermint/db"
  12. "github.com/tendermint/tendermint/events"
  13. "github.com/tendermint/tendermint/p2p"
  14. sm "github.com/tendermint/tendermint/state"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. const (
  18. BlockchainChannel = byte(0x40)
  19. defaultChannelCapacity = 100
  20. defaultSleepIntervalMS = 500
  21. trySyncIntervalMS = 100
  22. // stop syncing when last block's time is
  23. // within this much of the system time.
  24. // stopSyncingDurationMinutes = 10
  25. // ask for best height every 10s
  26. statusUpdateIntervalSeconds = 10
  27. // check if we should switch to consensus reactor
  28. switchToConsensusIntervalSeconds = 10
  29. )
  30. type consensusReactor interface {
  31. SetSyncing(bool)
  32. ResetToState(*sm.State)
  33. }
  34. // BlockchainReactor handles long-term catchup syncing.
  35. type BlockchainReactor struct {
  36. sw *p2p.Switch
  37. state *sm.State
  38. store *BlockStore
  39. pool *BlockPool
  40. sync bool
  41. requestsCh chan BlockRequest
  42. timeoutsCh chan string
  43. lastBlock *types.Block
  44. quit chan struct{}
  45. running uint32
  46. evsw events.Fireable
  47. }
  48. func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor {
  49. if state.LastBlockHeight != store.Height() &&
  50. state.LastBlockHeight != store.Height()-1 { // XXX double check this logic.
  51. panic(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  52. }
  53. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  54. timeoutsCh := make(chan string, defaultChannelCapacity)
  55. pool := NewBlockPool(
  56. store.Height()+1,
  57. requestsCh,
  58. timeoutsCh,
  59. )
  60. bcR := &BlockchainReactor{
  61. state: state,
  62. store: store,
  63. pool: pool,
  64. sync: sync,
  65. requestsCh: requestsCh,
  66. timeoutsCh: timeoutsCh,
  67. quit: make(chan struct{}),
  68. running: uint32(0),
  69. }
  70. return bcR
  71. }
  72. // Implements Reactor
  73. func (bcR *BlockchainReactor) Start(sw *p2p.Switch) {
  74. if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) {
  75. log.Info("Starting BlockchainReactor")
  76. bcR.sw = sw
  77. if bcR.sync {
  78. bcR.pool.Start()
  79. go bcR.poolRoutine()
  80. }
  81. }
  82. }
  83. // Implements Reactor
  84. func (bcR *BlockchainReactor) Stop() {
  85. if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) {
  86. log.Info("Stopping BlockchainReactor")
  87. close(bcR.quit)
  88. bcR.pool.Stop()
  89. }
  90. }
  91. // Implements Reactor
  92. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  93. return []*p2p.ChannelDescriptor{
  94. &p2p.ChannelDescriptor{
  95. Id: BlockchainChannel,
  96. Priority: 5,
  97. SendQueueCapacity: 100,
  98. },
  99. }
  100. }
  101. // Implements Reactor
  102. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
  103. // Send peer our state.
  104. peer.Send(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
  105. }
  106. // Implements Reactor
  107. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  108. // Remove peer from the pool.
  109. bcR.pool.RemovePeer(peer.Key)
  110. }
  111. // Implements Reactor
  112. func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
  113. _, msg, err := DecodeMessage(msgBytes)
  114. if err != nil {
  115. log.Warn("Error decoding message", "error", err)
  116. return
  117. }
  118. log.Info("Received message", "msg", msg)
  119. switch msg := msg.(type) {
  120. case *bcBlockRequestMessage:
  121. // Got a request for a block. Respond with block if we have it.
  122. block := bcR.store.LoadBlock(msg.Height)
  123. if block != nil {
  124. msg := &bcBlockResponseMessage{Block: block}
  125. queued := src.TrySend(BlockchainChannel, msg)
  126. if !queued {
  127. // queue is full, just ignore.
  128. }
  129. } else {
  130. // TODO peer is asking for things we don't have.
  131. }
  132. case *bcBlockResponseMessage:
  133. // Got a block.
  134. bcR.pool.AddBlock(msg.Block, src.Key)
  135. case *bcStatusRequestMessage:
  136. // Send peer our state.
  137. queued := src.TrySend(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
  138. if !queued {
  139. // sorry
  140. }
  141. case *bcStatusResponseMessage:
  142. // Got a peer status. Unverified.
  143. bcR.pool.SetPeerHeight(src.Key, msg.Height)
  144. default:
  145. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  146. }
  147. }
  148. // Handle messages from the poolReactor telling the reactor what to do.
  149. func (bcR *BlockchainReactor) poolRoutine() {
  150. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  151. statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
  152. switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
  153. FOR_LOOP:
  154. for {
  155. select {
  156. case request := <-bcR.requestsCh: // chan BlockRequest
  157. peer := bcR.sw.Peers().Get(request.PeerId)
  158. if peer == nil {
  159. // We can't fulfill the request.
  160. continue FOR_LOOP
  161. }
  162. msg := &bcBlockRequestMessage{request.Height}
  163. queued := peer.TrySend(BlockchainChannel, msg)
  164. if !queued {
  165. // We couldn't queue the request.
  166. time.Sleep(defaultSleepIntervalMS * time.Millisecond)
  167. continue FOR_LOOP
  168. }
  169. case peerId := <-bcR.timeoutsCh: // chan string
  170. // Peer timed out.
  171. peer := bcR.sw.Peers().Get(peerId)
  172. if peer != nil {
  173. bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  174. }
  175. case _ = <-statusUpdateTicker.C:
  176. // ask for status updates
  177. go bcR.BroadcastStatusRequest()
  178. case _ = <-switchToConsensusTicker.C:
  179. // not thread safe access for peerless and numPending but should be fine
  180. log.Debug("Consensus ticker", "peerless", bcR.pool.peerless, "pending", bcR.pool.numPending, "total", bcR.pool.numTotal)
  181. // NOTE: this condition is very strict right now. may need to weaken
  182. // if the max amount of requests are pending and peerless
  183. // and we have some peers (say > 5), then we're caught up
  184. maxPending := bcR.pool.numPending == maxPendingRequests
  185. maxPeerless := bcR.pool.peerless == bcR.pool.numPending
  186. o, i, _ := bcR.sw.NumPeers()
  187. enoughPeers := o+i > 5
  188. if maxPending && maxPeerless && enoughPeers {
  189. log.Warn("Time to switch to consensus reactor!", "height", bcR.pool.height)
  190. bcR.pool.Stop()
  191. stateDB := dbm.GetDB("state")
  192. state := sm.LoadState(stateDB)
  193. bcR.sw.Reactor("CONSENSUS").(consensusReactor).ResetToState(state)
  194. bcR.sw.Reactor("CONSENSUS").(consensusReactor).SetSyncing(false)
  195. break FOR_LOOP
  196. }
  197. case _ = <-trySyncTicker.C: // chan time
  198. //var lastValidatedBlock *types.Block
  199. SYNC_LOOP:
  200. for i := 0; i < 10; i++ {
  201. // See if there are any blocks to sync.
  202. first, second := bcR.pool.PeekTwoBlocks()
  203. //log.Debug("TrySync peeked", "first", first, "second", second)
  204. if first == nil || second == nil {
  205. // We need both to sync the first block.
  206. break SYNC_LOOP
  207. }
  208. firstParts := first.MakePartSet()
  209. firstPartsHeader := firstParts.Header()
  210. // Finally, verify the first block using the second's validation.
  211. err := bcR.state.BondedValidators.VerifyValidation(
  212. first.Hash(), firstPartsHeader, first.Height, second.Validation)
  213. if err != nil {
  214. log.Debug("error in validation", "error", err)
  215. bcR.pool.RedoRequest(first.Height)
  216. break SYNC_LOOP
  217. } else {
  218. bcR.pool.PopRequest()
  219. err := sm.ExecBlock(bcR.state, first, firstPartsHeader)
  220. if err != nil {
  221. // TODO This is bad, are we zombie?
  222. panic(Fmt("Failed to process committed block: %v", err))
  223. }
  224. bcR.store.SaveBlock(first, firstParts, second.Validation)
  225. bcR.state.Save()
  226. //lastValidatedBlock = first
  227. }
  228. }
  229. /*
  230. // We're done syncing for now (will do again shortly)
  231. // See if we want to stop syncing and turn on the
  232. // consensus reactor.
  233. // TODO: use other heuristics too besides blocktime.
  234. // It's not a security concern, as it only needs to happen
  235. // upon node sync, and there's also a second (slower)
  236. // this peer failed us
  237. // method of syncing in the consensus reactor.
  238. if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute {
  239. go func() {
  240. log.Info("Stopping blockpool syncing, turning on consensus...")
  241. trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others.
  242. conR := bcR.sw.Reactor("CONSENSUS")
  243. conR.(stateResetter).ResetToState(bcR.state)
  244. conR.Start(bcR.sw)
  245. for _, peer := range bcR.sw.Peers().List() {
  246. conR.AddPeer(peer)
  247. }
  248. }()
  249. break FOR_LOOP
  250. }
  251. */
  252. continue FOR_LOOP
  253. case <-bcR.quit:
  254. break FOR_LOOP
  255. }
  256. }
  257. }
  258. func (bcR *BlockchainReactor) BroadcastStatusResponse() error {
  259. bcR.sw.Broadcast(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
  260. return nil
  261. }
  262. func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
  263. bcR.sw.Broadcast(BlockchainChannel, &bcStatusRequestMessage{bcR.store.Height()})
  264. return nil
  265. }
  266. // implements events.Eventable
  267. func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) {
  268. bcR.evsw = evsw
  269. }
  270. //-----------------------------------------------------------------------------
  271. // Messages
  272. const (
  273. msgTypeBlockRequest = byte(0x10)
  274. msgTypeBlockResponse = byte(0x11)
  275. msgTypeStatusResponse = byte(0x20)
  276. msgTypeStatusRequest = byte(0x21)
  277. )
  278. type BlockchainMessage interface{}
  279. var _ = binary.RegisterInterface(
  280. struct{ BlockchainMessage }{},
  281. binary.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
  282. binary.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
  283. binary.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
  284. binary.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
  285. )
  286. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
  287. msgType = bz[0]
  288. n := new(int64)
  289. r := bytes.NewReader(bz)
  290. msg = binary.ReadBinary(struct{ BlockchainMessage }{}, r, n, &err).(struct{ BlockchainMessage }).BlockchainMessage
  291. return
  292. }
  293. //-------------------------------------
  294. type bcBlockRequestMessage struct {
  295. Height uint
  296. }
  297. func (m *bcBlockRequestMessage) String() string {
  298. return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
  299. }
  300. //-------------------------------------
  301. type bcBlockResponseMessage struct {
  302. Block *types.Block
  303. }
  304. func (m *bcBlockResponseMessage) String() string {
  305. return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
  306. }
  307. //-------------------------------------
  308. type bcStatusRequestMessage struct {
  309. Height uint
  310. }
  311. func (m *bcStatusRequestMessage) String() string {
  312. return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
  313. }
  314. //-------------------------------------
  315. type bcStatusResponseMessage struct {
  316. Height uint
  317. }
  318. func (m *bcStatusResponseMessage) String() string {
  319. return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
  320. }