You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

343 lines
9.9 KiB

9 years ago
  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "time"
  8. . "github.com/tendermint/go-common"
  9. "github.com/tendermint/go-events"
  10. "github.com/tendermint/go-p2p"
  11. "github.com/tendermint/go-wire"
  12. "github.com/tendermint/tendermint/proxy"
  13. sm "github.com/tendermint/tendermint/state"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. const (
  17. BlockchainChannel = byte(0x40)
  18. defaultChannelCapacity = 100
  19. defaultSleepIntervalMS = 500
  20. trySyncIntervalMS = 100
  21. // stop syncing when last block's time is
  22. // within this much of the system time.
  23. // stopSyncingDurationMinutes = 10
  24. // ask for best height every 10s
  25. statusUpdateIntervalSeconds = 10
  26. // check if we should switch to consensus reactor
  27. switchToConsensusIntervalSeconds = 1
  28. maxBlockchainResponseSize = types.MaxBlockSize + 2
  29. )
  30. type consensusReactor interface {
  31. // for when we switch from blockchain reactor and fast sync to
  32. // the consensus machine
  33. SwitchToConsensus(*sm.State)
  34. }
  35. // BlockchainReactor handles long-term catchup syncing.
  36. type BlockchainReactor struct {
  37. p2p.BaseReactor
  38. sw *p2p.Switch
  39. state *sm.State
  40. proxyAppConn proxy.AppConn // same as consensus.proxyAppConn
  41. store *BlockStore
  42. pool *BlockPool
  43. fastSync bool
  44. requestsCh chan BlockRequest
  45. timeoutsCh chan string
  46. lastBlock *types.Block
  47. evsw *events.EventSwitch
  48. }
  49. func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConn, store *BlockStore, fastSync bool) *BlockchainReactor {
  50. if state.LastBlockHeight == store.Height()-1 {
  51. store.height -= 1 // XXX HACK, make this better
  52. }
  53. if state.LastBlockHeight != store.Height() {
  54. PanicSanity(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  55. }
  56. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  57. timeoutsCh := make(chan string, defaultChannelCapacity)
  58. pool := NewBlockPool(
  59. store.Height()+1,
  60. requestsCh,
  61. timeoutsCh,
  62. )
  63. bcR := &BlockchainReactor{
  64. state: state,
  65. proxyAppConn: proxyAppConn,
  66. store: store,
  67. pool: pool,
  68. fastSync: fastSync,
  69. requestsCh: requestsCh,
  70. timeoutsCh: timeoutsCh,
  71. }
  72. bcR.BaseReactor = *p2p.NewBaseReactor(log, "BlockchainReactor", bcR)
  73. return bcR
  74. }
  75. func (bcR *BlockchainReactor) OnStart() error {
  76. bcR.BaseReactor.OnStart()
  77. if bcR.fastSync {
  78. _, err := bcR.pool.Start()
  79. if err != nil {
  80. return err
  81. }
  82. go bcR.poolRoutine()
  83. }
  84. return nil
  85. }
  86. func (bcR *BlockchainReactor) OnStop() {
  87. bcR.BaseReactor.OnStop()
  88. bcR.pool.Stop()
  89. }
  90. // Implements Reactor
  91. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  92. return []*p2p.ChannelDescriptor{
  93. &p2p.ChannelDescriptor{
  94. ID: BlockchainChannel,
  95. Priority: 5,
  96. SendQueueCapacity: 100,
  97. },
  98. }
  99. }
  100. // Implements Reactor
  101. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
  102. // Send peer our state.
  103. peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  104. }
  105. // Implements Reactor
  106. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  107. // Remove peer from the pool.
  108. bcR.pool.RemovePeer(peer.Key)
  109. }
  110. // Implements Reactor
  111. func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  112. _, msg, err := DecodeMessage(msgBytes)
  113. if err != nil {
  114. log.Warn("Error decoding message", "error", err)
  115. return
  116. }
  117. log.Notice("Receive", "src", src, "chID", chID, "msg", msg)
  118. switch msg := msg.(type) {
  119. case *bcBlockRequestMessage:
  120. // Got a request for a block. Respond with block if we have it.
  121. block := bcR.store.LoadBlock(msg.Height)
  122. if block != nil {
  123. msg := &bcBlockResponseMessage{Block: block}
  124. queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
  125. if !queued {
  126. // queue is full, just ignore.
  127. }
  128. } else {
  129. // TODO peer is asking for things we don't have.
  130. }
  131. case *bcBlockResponseMessage:
  132. // Got a block.
  133. bcR.pool.AddBlock(src.Key, msg.Block, len(msgBytes))
  134. case *bcStatusRequestMessage:
  135. // Send peer our state.
  136. queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  137. if !queued {
  138. // sorry
  139. }
  140. case *bcStatusResponseMessage:
  141. // Got a peer status. Unverified.
  142. bcR.pool.SetPeerHeight(src.Key, msg.Height)
  143. default:
  144. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  145. }
  146. }
  147. // Handle messages from the poolReactor telling the reactor what to do.
  148. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
  149. // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
  150. func (bcR *BlockchainReactor) poolRoutine() {
  151. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  152. statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
  153. switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
  154. FOR_LOOP:
  155. for {
  156. select {
  157. case request := <-bcR.requestsCh: // chan BlockRequest
  158. peer := bcR.Switch.Peers().Get(request.PeerID)
  159. if peer == nil {
  160. continue FOR_LOOP // Peer has since been disconnected.
  161. }
  162. msg := &bcBlockRequestMessage{request.Height}
  163. queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
  164. if !queued {
  165. // We couldn't make the request, send-queue full.
  166. // The pool handles timeouts, just let it go.
  167. continue FOR_LOOP
  168. }
  169. case peerID := <-bcR.timeoutsCh: // chan string
  170. // Peer timed out.
  171. peer := bcR.Switch.Peers().Get(peerID)
  172. if peer != nil {
  173. bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  174. }
  175. case _ = <-statusUpdateTicker.C:
  176. // ask for status updates
  177. go bcR.BroadcastStatusRequest()
  178. case _ = <-switchToConsensusTicker.C:
  179. height, numPending := bcR.pool.GetStatus()
  180. outbound, inbound, _ := bcR.Switch.NumPeers()
  181. log.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters),
  182. "outbound", outbound, "inbound", inbound)
  183. if bcR.pool.IsCaughtUp() {
  184. log.Notice("Time to switch to consensus reactor!", "height", height)
  185. bcR.pool.Stop()
  186. conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
  187. conR.SwitchToConsensus(bcR.state)
  188. break FOR_LOOP
  189. }
  190. case _ = <-trySyncTicker.C: // chan time
  191. // This loop can be slow as long as it's doing syncing work.
  192. SYNC_LOOP:
  193. for i := 0; i < 10; i++ {
  194. // See if there are any blocks to sync.
  195. first, second := bcR.pool.PeekTwoBlocks()
  196. //log.Info("TrySync peeked", "first", first, "second", second)
  197. if first == nil || second == nil {
  198. // We need both to sync the first block.
  199. break SYNC_LOOP
  200. }
  201. firstParts := first.MakePartSet()
  202. firstPartsHeader := firstParts.Header()
  203. // Finally, verify the first block using the second's validation.
  204. err := bcR.state.Validators.VerifyValidation(
  205. bcR.state.ChainID, first.Hash(), firstPartsHeader, first.Height, second.LastValidation)
  206. if err != nil {
  207. log.Info("error in validation", "error", err)
  208. bcR.pool.RedoRequest(first.Height)
  209. break SYNC_LOOP
  210. } else {
  211. bcR.pool.PopRequest()
  212. err := bcR.state.ExecBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader)
  213. if err != nil {
  214. // TODO This is bad, are we zombie?
  215. PanicQ(Fmt("Failed to process committed block: %v", err))
  216. }
  217. /*
  218. err = bcR.proxyAppConn.CommitSync()
  219. if err != nil {
  220. // TODO Handle gracefully.
  221. PanicQ(Fmt("Failed to commit block at application: %v", err))
  222. }
  223. */
  224. bcR.store.SaveBlock(first, firstParts, second.LastValidation)
  225. bcR.state.Save()
  226. }
  227. }
  228. continue FOR_LOOP
  229. case <-bcR.Quit:
  230. break FOR_LOOP
  231. }
  232. }
  233. }
  234. func (bcR *BlockchainReactor) BroadcastStatusResponse() error {
  235. bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  236. return nil
  237. }
  238. func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
  239. bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.store.Height()}})
  240. return nil
  241. }
  242. // implements events.Eventable
  243. func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) {
  244. bcR.evsw = evsw
  245. }
  246. //-----------------------------------------------------------------------------
  247. // Messages
  248. const (
  249. msgTypeBlockRequest = byte(0x10)
  250. msgTypeBlockResponse = byte(0x11)
  251. msgTypeStatusResponse = byte(0x20)
  252. msgTypeStatusRequest = byte(0x21)
  253. )
  254. type BlockchainMessage interface{}
  255. var _ = wire.RegisterInterface(
  256. struct{ BlockchainMessage }{},
  257. wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
  258. wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
  259. wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
  260. wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
  261. )
  262. // TODO: ensure that bz is completely read.
  263. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
  264. msgType = bz[0]
  265. n := int(0)
  266. r := bytes.NewReader(bz)
  267. msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
  268. if err != nil && n != len(bz) {
  269. err = errors.New("DecodeMessage() had bytes left over.")
  270. }
  271. return
  272. }
  273. //-------------------------------------
  274. type bcBlockRequestMessage struct {
  275. Height int
  276. }
  277. func (m *bcBlockRequestMessage) String() string {
  278. return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
  279. }
  280. //-------------------------------------
  281. // NOTE: keep up-to-date with maxBlockchainResponseSize
  282. type bcBlockResponseMessage struct {
  283. Block *types.Block
  284. }
  285. func (m *bcBlockResponseMessage) String() string {
  286. return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
  287. }
  288. //-------------------------------------
  289. type bcStatusRequestMessage struct {
  290. Height int
  291. }
  292. func (m *bcStatusRequestMessage) String() string {
  293. return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
  294. }
  295. //-------------------------------------
  296. type bcStatusResponseMessage struct {
  297. Height int
  298. }
  299. func (m *bcStatusResponseMessage) String() string {
  300. return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
  301. }