You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

341 lines
9.8 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync/atomic"
  8. "time"
  9. "github.com/tendermint/tendermint/binary"
  10. . "github.com/tendermint/tendermint/common"
  11. "github.com/tendermint/tendermint/events"
  12. "github.com/tendermint/tendermint/p2p"
  13. sm "github.com/tendermint/tendermint/state"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. const (
  17. BlockchainChannel = byte(0x40)
  18. defaultChannelCapacity = 100
  19. defaultSleepIntervalMS = 500
  20. trySyncIntervalMS = 100
  21. // stop syncing when last block's time is
  22. // within this much of the system time.
  23. // stopSyncingDurationMinutes = 10
  24. // ask for best height every 10s
  25. statusUpdateIntervalSeconds = 10
  26. // check if we should switch to consensus reactor
  27. switchToConsensusIntervalSeconds = 10
  28. )
  29. type consensusReactor interface {
  30. // for when we switch from blockchain reactor and fast sync to
  31. // the consensus machine
  32. SwitchToConsensus(*sm.State)
  33. }
  34. // BlockchainReactor handles long-term catchup syncing.
  35. type BlockchainReactor struct {
  36. sw *p2p.Switch
  37. state *sm.State
  38. store *BlockStore
  39. pool *BlockPool
  40. sync bool
  41. requestsCh chan BlockRequest
  42. timeoutsCh chan string
  43. lastBlock *types.Block
  44. quit chan struct{}
  45. running uint32
  46. evsw events.Fireable
  47. }
  48. func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor {
  49. // SANITY CHECK
  50. if state.LastBlockHeight != store.Height() &&
  51. state.LastBlockHeight != store.Height()-1 { // XXX double check this logic.
  52. panic(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  53. }
  54. // SANITY CHECK END
  55. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  56. timeoutsCh := make(chan string, defaultChannelCapacity)
  57. pool := NewBlockPool(
  58. store.Height()+1,
  59. requestsCh,
  60. timeoutsCh,
  61. )
  62. bcR := &BlockchainReactor{
  63. state: state,
  64. store: store,
  65. pool: pool,
  66. sync: sync,
  67. requestsCh: requestsCh,
  68. timeoutsCh: timeoutsCh,
  69. quit: make(chan struct{}),
  70. running: uint32(0),
  71. }
  72. return bcR
  73. }
  74. // Implements Reactor
  75. func (bcR *BlockchainReactor) Start(sw *p2p.Switch) {
  76. if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) {
  77. log.Info("Starting BlockchainReactor")
  78. bcR.sw = sw
  79. if bcR.sync {
  80. bcR.pool.Start()
  81. go bcR.poolRoutine()
  82. }
  83. }
  84. }
  85. // Implements Reactor
  86. func (bcR *BlockchainReactor) Stop() {
  87. if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) {
  88. log.Info("Stopping BlockchainReactor")
  89. close(bcR.quit)
  90. bcR.pool.Stop()
  91. }
  92. }
  93. // Implements Reactor
  94. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  95. return []*p2p.ChannelDescriptor{
  96. &p2p.ChannelDescriptor{
  97. Id: BlockchainChannel,
  98. Priority: 5,
  99. SendQueueCapacity: 100,
  100. },
  101. }
  102. }
  103. // Implements Reactor
  104. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
  105. // Send peer our state.
  106. peer.Send(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
  107. }
  108. // Implements Reactor
  109. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  110. // Remove peer from the pool.
  111. bcR.pool.RemovePeer(peer.Key)
  112. }
  113. // Implements Reactor
  114. func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
  115. _, msg, err := DecodeMessage(msgBytes)
  116. if err != nil {
  117. log.Warn("Error decoding message", "error", err)
  118. return
  119. }
  120. log.Info("Received message", "msg", msg)
  121. switch msg := msg.(type) {
  122. case *bcBlockRequestMessage:
  123. // Got a request for a block. Respond with block if we have it.
  124. block := bcR.store.LoadBlock(msg.Height)
  125. if block != nil {
  126. msg := &bcBlockResponseMessage{Block: block}
  127. queued := src.TrySend(BlockchainChannel, msg)
  128. if !queued {
  129. // queue is full, just ignore.
  130. }
  131. } else {
  132. // TODO peer is asking for things we don't have.
  133. }
  134. case *bcBlockResponseMessage:
  135. // Got a block.
  136. bcR.pool.AddBlock(msg.Block, src.Key)
  137. case *bcStatusRequestMessage:
  138. // Send peer our state.
  139. queued := src.TrySend(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
  140. if !queued {
  141. // sorry
  142. }
  143. case *bcStatusResponseMessage:
  144. // Got a peer status. Unverified.
  145. bcR.pool.SetPeerHeight(src.Key, msg.Height)
  146. default:
  147. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  148. }
  149. }
  150. // Handle messages from the poolReactor telling the reactor what to do.
  151. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
  152. // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
  153. func (bcR *BlockchainReactor) poolRoutine() {
  154. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  155. statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
  156. switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
  157. FOR_LOOP:
  158. for {
  159. select {
  160. case request := <-bcR.requestsCh: // chan BlockRequest
  161. peer := bcR.sw.Peers().Get(request.PeerId)
  162. if peer == nil {
  163. // We can't assign the request.
  164. continue FOR_LOOP
  165. }
  166. msg := &bcBlockRequestMessage{request.Height}
  167. queued := peer.TrySend(BlockchainChannel, msg)
  168. if !queued {
  169. // We couldn't make the request, send-queue full.
  170. // The pool handles retries, so just let it go.
  171. continue FOR_LOOP
  172. }
  173. case peerId := <-bcR.timeoutsCh: // chan string
  174. // Peer timed out.
  175. peer := bcR.sw.Peers().Get(peerId)
  176. if peer != nil {
  177. bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  178. }
  179. case _ = <-statusUpdateTicker.C:
  180. // ask for status updates
  181. go bcR.BroadcastStatusRequest()
  182. case _ = <-switchToConsensusTicker.C:
  183. // not thread safe access for numUnassigned and numPending but should be fine
  184. // TODO make threadsafe and use exposed functions
  185. outbound, inbound, _ := bcR.sw.NumPeers()
  186. log.Debug("Consensus ticker", "numUnassigned", bcR.pool.numUnassigned, "numPending", bcR.pool.numPending,
  187. "total", len(bcR.pool.requests), "outbound", outbound, "inbound", inbound)
  188. // NOTE: this condition is very strict right now. may need to weaken
  189. // If all `maxPendingRequests` requests are unassigned
  190. // and we have some peers (say >= 3), then we're caught up
  191. maxPending := bcR.pool.numPending == maxPendingRequests
  192. allUnassigned := bcR.pool.numPending == bcR.pool.numUnassigned
  193. enoughPeers := outbound+inbound >= 3
  194. if maxPending && allUnassigned && enoughPeers {
  195. log.Info("Time to switch to consensus reactor!", "height", bcR.pool.height)
  196. bcR.pool.Stop()
  197. conR := bcR.sw.Reactor("CONSENSUS").(consensusReactor)
  198. conR.SwitchToConsensus(bcR.state)
  199. break FOR_LOOP
  200. }
  201. case _ = <-trySyncTicker.C: // chan time
  202. // This loop can be slow as long as it's doing syncing work.
  203. SYNC_LOOP:
  204. for i := 0; i < 10; i++ {
  205. // See if there are any blocks to sync.
  206. first, second := bcR.pool.PeekTwoBlocks()
  207. //log.Debug("TrySync peeked", "first", first, "second", second)
  208. if first == nil || second == nil {
  209. // We need both to sync the first block.
  210. break SYNC_LOOP
  211. }
  212. firstParts := first.MakePartSet()
  213. firstPartsHeader := firstParts.Header()
  214. // Finally, verify the first block using the second's validation.
  215. err := bcR.state.BondedValidators.VerifyValidation(
  216. bcR.state.ChainID, first.Hash(), firstPartsHeader, first.Height, second.LastValidation)
  217. if err != nil {
  218. log.Debug("error in validation", "error", err)
  219. bcR.pool.RedoRequest(first.Height)
  220. break SYNC_LOOP
  221. } else {
  222. bcR.pool.PopRequest()
  223. err := sm.ExecBlock(bcR.state, first, firstPartsHeader)
  224. if err != nil {
  225. // TODO This is bad, are we zombie?
  226. panic(Fmt("Failed to process committed block: %v", err))
  227. }
  228. bcR.store.SaveBlock(first, firstParts, second.LastValidation)
  229. bcR.state.Save()
  230. }
  231. }
  232. continue FOR_LOOP
  233. case <-bcR.quit:
  234. break FOR_LOOP
  235. }
  236. }
  237. }
  238. func (bcR *BlockchainReactor) BroadcastStatusResponse() error {
  239. bcR.sw.Broadcast(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
  240. return nil
  241. }
  242. func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
  243. bcR.sw.Broadcast(BlockchainChannel, &bcStatusRequestMessage{bcR.store.Height()})
  244. return nil
  245. }
  246. // implements events.Eventable
  247. func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) {
  248. bcR.evsw = evsw
  249. }
  250. //-----------------------------------------------------------------------------
  251. // Messages
  252. const (
  253. msgTypeBlockRequest = byte(0x10)
  254. msgTypeBlockResponse = byte(0x11)
  255. msgTypeStatusResponse = byte(0x20)
  256. msgTypeStatusRequest = byte(0x21)
  257. )
  258. type BlockchainMessage interface{}
  259. var _ = binary.RegisterInterface(
  260. struct{ BlockchainMessage }{},
  261. binary.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
  262. binary.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
  263. binary.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
  264. binary.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
  265. )
  266. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
  267. msgType = bz[0]
  268. n := new(int64)
  269. r := bytes.NewReader(bz)
  270. msg = binary.ReadBinary(struct{ BlockchainMessage }{}, r, n, &err).(struct{ BlockchainMessage }).BlockchainMessage
  271. return
  272. }
  273. //-------------------------------------
  274. type bcBlockRequestMessage struct {
  275. Height int
  276. }
  277. func (m *bcBlockRequestMessage) String() string {
  278. return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
  279. }
  280. //-------------------------------------
  281. type bcBlockResponseMessage struct {
  282. Block *types.Block
  283. }
  284. func (m *bcBlockResponseMessage) String() string {
  285. return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
  286. }
  287. //-------------------------------------
  288. type bcStatusRequestMessage struct {
  289. Height int
  290. }
  291. func (m *bcStatusRequestMessage) String() string {
  292. return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
  293. }
  294. //-------------------------------------
  295. type bcStatusResponseMessage struct {
  296. Height int
  297. }
  298. func (m *bcStatusResponseMessage) String() string {
  299. return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
  300. }