You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

346 lines
10 KiB

8 years ago
8 years ago
  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "time"
  8. . "github.com/tendermint/go-common"
  9. cfg "github.com/tendermint/go-config"
  10. "github.com/tendermint/go-p2p"
  11. "github.com/tendermint/go-wire"
  12. "github.com/tendermint/tendermint/proxy"
  13. sm "github.com/tendermint/tendermint/state"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. const (
  17. BlockchainChannel = byte(0x40)
  18. defaultChannelCapacity = 100
  19. defaultSleepIntervalMS = 500
  20. trySyncIntervalMS = 100
  21. // stop syncing when last block's time is
  22. // within this much of the system time.
  23. // stopSyncingDurationMinutes = 10
  24. // ask for best height every 10s
  25. statusUpdateIntervalSeconds = 10
  26. // check if we should switch to consensus reactor
  27. switchToConsensusIntervalSeconds = 1
  28. maxBlockchainResponseSize = types.MaxBlockSize + 2
  29. )
  30. type consensusReactor interface {
  31. // for when we switch from blockchain reactor and fast sync to
  32. // the consensus machine
  33. SwitchToConsensus(*sm.State)
  34. }
  35. // BlockchainReactor handles long-term catchup syncing.
  36. type BlockchainReactor struct {
  37. p2p.BaseReactor
  38. config cfg.Config
  39. state *sm.State
  40. proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn
  41. store *BlockStore
  42. pool *BlockPool
  43. fastSync bool
  44. requestsCh chan BlockRequest
  45. timeoutsCh chan string
  46. lastBlock *types.Block
  47. evsw types.EventSwitch
  48. }
  49. func NewBlockchainReactor(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor {
  50. if state.LastBlockHeight == store.Height()-1 {
  51. store.height -= 1 // XXX HACK, make this better
  52. }
  53. if state.LastBlockHeight != store.Height() {
  54. PanicSanity(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  55. }
  56. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  57. timeoutsCh := make(chan string, defaultChannelCapacity)
  58. pool := NewBlockPool(
  59. store.Height()+1,
  60. requestsCh,
  61. timeoutsCh,
  62. )
  63. bcR := &BlockchainReactor{
  64. config: config,
  65. state: state,
  66. proxyAppConn: proxyAppConn,
  67. store: store,
  68. pool: pool,
  69. fastSync: fastSync,
  70. requestsCh: requestsCh,
  71. timeoutsCh: timeoutsCh,
  72. }
  73. bcR.BaseReactor = *p2p.NewBaseReactor(log, "BlockchainReactor", bcR)
  74. return bcR
  75. }
  76. func (bcR *BlockchainReactor) OnStart() error {
  77. bcR.BaseReactor.OnStart()
  78. if bcR.fastSync {
  79. _, err := bcR.pool.Start()
  80. if err != nil {
  81. return err
  82. }
  83. go bcR.poolRoutine()
  84. }
  85. return nil
  86. }
  87. func (bcR *BlockchainReactor) OnStop() {
  88. bcR.BaseReactor.OnStop()
  89. bcR.pool.Stop()
  90. }
  91. // Implements Reactor
  92. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  93. return []*p2p.ChannelDescriptor{
  94. &p2p.ChannelDescriptor{
  95. ID: BlockchainChannel,
  96. Priority: 5,
  97. SendQueueCapacity: 100,
  98. },
  99. }
  100. }
  101. // Implements Reactor
  102. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
  103. // Send peer our state.
  104. peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  105. }
  106. // Implements Reactor
  107. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  108. // Remove peer from the pool.
  109. bcR.pool.RemovePeer(peer.Key)
  110. }
  111. // Implements Reactor
  112. func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  113. _, msg, err := DecodeMessage(msgBytes)
  114. if err != nil {
  115. log.Warn("Error decoding message", "error", err)
  116. return
  117. }
  118. log.Debug("Receive", "src", src, "chID", chID, "msg", msg)
  119. switch msg := msg.(type) {
  120. case *bcBlockRequestMessage:
  121. // Got a request for a block. Respond with block if we have it.
  122. block := bcR.store.LoadBlock(msg.Height)
  123. if block != nil {
  124. msg := &bcBlockResponseMessage{Block: block}
  125. queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
  126. if !queued {
  127. // queue is full, just ignore.
  128. }
  129. } else {
  130. // TODO peer is asking for things we don't have.
  131. }
  132. case *bcBlockResponseMessage:
  133. // Got a block.
  134. bcR.pool.AddBlock(src.Key, msg.Block, len(msgBytes))
  135. case *bcStatusRequestMessage:
  136. // Send peer our state.
  137. queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  138. if !queued {
  139. // sorry
  140. }
  141. case *bcStatusResponseMessage:
  142. // Got a peer status. Unverified.
  143. bcR.pool.SetPeerHeight(src.Key, msg.Height)
  144. default:
  145. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  146. }
  147. }
  148. // Handle messages from the poolReactor telling the reactor what to do.
  149. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
  150. // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
  151. func (bcR *BlockchainReactor) poolRoutine() {
  152. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  153. statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
  154. switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
  155. FOR_LOOP:
  156. for {
  157. select {
  158. case request := <-bcR.requestsCh: // chan BlockRequest
  159. peer := bcR.Switch.Peers().Get(request.PeerID)
  160. if peer == nil {
  161. continue FOR_LOOP // Peer has since been disconnected.
  162. }
  163. msg := &bcBlockRequestMessage{request.Height}
  164. queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
  165. if !queued {
  166. // We couldn't make the request, send-queue full.
  167. // The pool handles timeouts, just let it go.
  168. continue FOR_LOOP
  169. }
  170. case peerID := <-bcR.timeoutsCh: // chan string
  171. // Peer timed out.
  172. peer := bcR.Switch.Peers().Get(peerID)
  173. if peer != nil {
  174. bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  175. }
  176. case _ = <-statusUpdateTicker.C:
  177. // ask for status updates
  178. go bcR.BroadcastStatusRequest()
  179. case _ = <-switchToConsensusTicker.C:
  180. height, numPending, _ := bcR.pool.GetStatus()
  181. outbound, inbound, _ := bcR.Switch.NumPeers()
  182. log.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters),
  183. "outbound", outbound, "inbound", inbound)
  184. if bcR.pool.IsCaughtUp() {
  185. log.Notice("Time to switch to consensus reactor!", "height", height)
  186. bcR.pool.Stop()
  187. conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
  188. conR.SwitchToConsensus(bcR.state)
  189. break FOR_LOOP
  190. }
  191. case _ = <-trySyncTicker.C: // chan time
  192. // This loop can be slow as long as it's doing syncing work.
  193. SYNC_LOOP:
  194. for i := 0; i < 10; i++ {
  195. // See if there are any blocks to sync.
  196. first, second := bcR.pool.PeekTwoBlocks()
  197. //log.Info("TrySync peeked", "first", first, "second", second)
  198. if first == nil || second == nil {
  199. // We need both to sync the first block.
  200. break SYNC_LOOP
  201. }
  202. firstParts := first.MakePartSet(bcR.config.GetInt("block_part_size")) // TODO: put part size in parts header?
  203. firstPartsHeader := firstParts.Header()
  204. // Finally, verify the first block using the second's commit
  205. // NOTE: we can probably make this more efficient, but note that calling
  206. // first.Hash() doesn't verify the tx contents, so MakePartSet() is
  207. // currently necessary.
  208. err := bcR.state.Validators.VerifyCommit(
  209. bcR.state.ChainID, types.BlockID{first.Hash(), firstPartsHeader}, first.Height, second.LastCommit)
  210. if err != nil {
  211. log.Info("error in validation", "error", err)
  212. bcR.pool.RedoRequest(first.Height)
  213. break SYNC_LOOP
  214. } else {
  215. bcR.pool.PopRequest()
  216. bcR.store.SaveBlock(first, firstParts, second.LastCommit)
  217. // TODO: should we be firing events? need to fire NewBlock events manually ...
  218. // NOTE: we could improve performance if we
  219. // didn't make the app commit to disk every block
  220. // ... but we would need a way to get the hash without it persisting
  221. err := bcR.state.ApplyBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader, types.MockMempool{})
  222. if err != nil {
  223. // TODO This is bad, are we zombie?
  224. PanicQ(Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
  225. }
  226. bcR.state.Save()
  227. }
  228. }
  229. continue FOR_LOOP
  230. case <-bcR.Quit:
  231. break FOR_LOOP
  232. }
  233. }
  234. }
  235. func (bcR *BlockchainReactor) BroadcastStatusResponse() error {
  236. bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  237. return nil
  238. }
  239. func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
  240. bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.store.Height()}})
  241. return nil
  242. }
  243. // implements events.Eventable
  244. func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) {
  245. bcR.evsw = evsw
  246. }
  247. //-----------------------------------------------------------------------------
  248. // Messages
  249. const (
  250. msgTypeBlockRequest = byte(0x10)
  251. msgTypeBlockResponse = byte(0x11)
  252. msgTypeStatusResponse = byte(0x20)
  253. msgTypeStatusRequest = byte(0x21)
  254. )
  255. type BlockchainMessage interface{}
  256. var _ = wire.RegisterInterface(
  257. struct{ BlockchainMessage }{},
  258. wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
  259. wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
  260. wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
  261. wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
  262. )
  263. // TODO: ensure that bz is completely read.
  264. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
  265. msgType = bz[0]
  266. n := int(0)
  267. r := bytes.NewReader(bz)
  268. msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
  269. if err != nil && n != len(bz) {
  270. err = errors.New("DecodeMessage() had bytes left over.")
  271. }
  272. return
  273. }
  274. //-------------------------------------
  275. type bcBlockRequestMessage struct {
  276. Height int
  277. }
  278. func (m *bcBlockRequestMessage) String() string {
  279. return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
  280. }
  281. //-------------------------------------
  282. // NOTE: keep up-to-date with maxBlockchainResponseSize
  283. type bcBlockResponseMessage struct {
  284. Block *types.Block
  285. }
  286. func (m *bcBlockResponseMessage) String() string {
  287. return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
  288. }
  289. //-------------------------------------
  290. type bcStatusRequestMessage struct {
  291. Height int
  292. }
  293. func (m *bcStatusRequestMessage) String() string {
  294. return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
  295. }
  296. //-------------------------------------
  297. type bcStatusResponseMessage struct {
  298. Height int
  299. }
  300. func (m *bcStatusResponseMessage) String() string {
  301. return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
  302. }