You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

343 lines
11 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
8 years ago
8 years ago
  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "reflect"
  6. "time"
  7. wire "github.com/tendermint/go-wire"
  8. "github.com/tendermint/tendermint/p2p"
  9. "github.com/tendermint/tendermint/proxy"
  10. sm "github.com/tendermint/tendermint/state"
  11. "github.com/tendermint/tendermint/types"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. )
  14. const (
  15. // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
  16. BlockchainChannel = byte(0x40)
  17. defaultChannelCapacity = 100
  18. trySyncIntervalMS = 100
  19. // stop syncing when last block's time is
  20. // within this much of the system time.
  21. // stopSyncingDurationMinutes = 10
  22. // ask for best height every 10s
  23. statusUpdateIntervalSeconds = 10
  24. // check if we should switch to consensus reactor
  25. switchToConsensusIntervalSeconds = 1
  26. maxBlockchainResponseSize = types.MaxBlockSize + 2
  27. )
  28. type consensusReactor interface {
  29. // for when we switch from blockchain reactor and fast sync to
  30. // the consensus machine
  31. SwitchToConsensus(*sm.State)
  32. }
  33. // BlockchainReactor handles long-term catchup syncing.
  34. type BlockchainReactor struct {
  35. p2p.BaseReactor
  36. state *sm.State
  37. proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn
  38. store *BlockStore
  39. pool *BlockPool
  40. fastSync bool
  41. requestsCh chan BlockRequest
  42. timeoutsCh chan string
  43. evsw types.EventSwitch
  44. }
  45. // NewBlockchainReactor returns new reactor instance.
  46. func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor {
  47. if state.LastBlockHeight == store.Height()-1 {
  48. store.height-- // XXX HACK, make this better
  49. }
  50. if state.LastBlockHeight != store.Height() {
  51. cmn.PanicSanity(cmn.Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  52. }
  53. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  54. timeoutsCh := make(chan string, defaultChannelCapacity)
  55. pool := NewBlockPool(
  56. store.Height()+1,
  57. requestsCh,
  58. timeoutsCh,
  59. )
  60. bcR := &BlockchainReactor{
  61. state: state,
  62. proxyAppConn: proxyAppConn,
  63. store: store,
  64. pool: pool,
  65. fastSync: fastSync,
  66. requestsCh: requestsCh,
  67. timeoutsCh: timeoutsCh,
  68. }
  69. bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
  70. return bcR
  71. }
  72. // OnStart implements BaseService
  73. func (bcR *BlockchainReactor) OnStart() error {
  74. bcR.BaseReactor.OnStart()
  75. if bcR.fastSync {
  76. _, err := bcR.pool.Start()
  77. if err != nil {
  78. return err
  79. }
  80. go bcR.poolRoutine()
  81. }
  82. return nil
  83. }
  84. // OnStop implements BaseService
  85. func (bcR *BlockchainReactor) OnStop() {
  86. bcR.BaseReactor.OnStop()
  87. bcR.pool.Stop()
  88. }
  89. // GetChannels implements Reactor
  90. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  91. return []*p2p.ChannelDescriptor{
  92. &p2p.ChannelDescriptor{
  93. ID: BlockchainChannel,
  94. Priority: 5,
  95. SendQueueCapacity: 100,
  96. },
  97. }
  98. }
  99. // AddPeer implements Reactor by sending our state to peer.
  100. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
  101. if !peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}}) {
  102. // doing nothing, will try later in `poolRoutine`
  103. }
  104. }
  105. // RemovePeer implements Reactor by removing peer from the pool.
  106. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  107. bcR.pool.RemovePeer(peer.Key)
  108. }
  109. // Receive implements Reactor by handling 4 types of messages (look below).
  110. func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  111. _, msg, err := DecodeMessage(msgBytes)
  112. if err != nil {
  113. bcR.Logger.Error("Error decoding message", "err", err)
  114. return
  115. }
  116. bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
  117. // TODO: improve logic to satisfy megacheck
  118. switch msg := msg.(type) {
  119. case *bcBlockRequestMessage:
  120. // Got a request for a block. Respond with block if we have it.
  121. block := bcR.store.LoadBlock(msg.Height)
  122. if block != nil {
  123. msg := &bcBlockResponseMessage{Block: block}
  124. queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
  125. if !queued {
  126. // queue is full, just ignore.
  127. }
  128. } else {
  129. // TODO peer is asking for things we don't have.
  130. }
  131. case *bcBlockResponseMessage:
  132. // Got a block.
  133. bcR.pool.AddBlock(src.Key, msg.Block, len(msgBytes))
  134. case *bcStatusRequestMessage:
  135. // Send peer our state.
  136. queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  137. if !queued {
  138. // sorry
  139. }
  140. case *bcStatusResponseMessage:
  141. // Got a peer status. Unverified.
  142. bcR.pool.SetPeerHeight(src.Key, msg.Height)
  143. default:
  144. bcR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  145. }
  146. }
  147. // Handle messages from the poolReactor telling the reactor what to do.
  148. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
  149. // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
  150. func (bcR *BlockchainReactor) poolRoutine() {
  151. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  152. statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
  153. switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
  154. FOR_LOOP:
  155. for {
  156. select {
  157. case request := <-bcR.requestsCh: // chan BlockRequest
  158. peer := bcR.Switch.Peers().Get(request.PeerID)
  159. if peer == nil {
  160. continue FOR_LOOP // Peer has since been disconnected.
  161. }
  162. msg := &bcBlockRequestMessage{request.Height}
  163. queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
  164. if !queued {
  165. // We couldn't make the request, send-queue full.
  166. // The pool handles timeouts, just let it go.
  167. continue FOR_LOOP
  168. }
  169. case peerID := <-bcR.timeoutsCh: // chan string
  170. // Peer timed out.
  171. peer := bcR.Switch.Peers().Get(peerID)
  172. if peer != nil {
  173. bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  174. }
  175. case <-statusUpdateTicker.C:
  176. // ask for status updates
  177. go bcR.BroadcastStatusRequest()
  178. case <-switchToConsensusTicker.C:
  179. height, numPending, _ := bcR.pool.GetStatus()
  180. outbound, inbound, _ := bcR.Switch.NumPeers()
  181. bcR.Logger.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters),
  182. "outbound", outbound, "inbound", inbound)
  183. if bcR.pool.IsCaughtUp() {
  184. bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
  185. bcR.pool.Stop()
  186. conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
  187. conR.SwitchToConsensus(bcR.state)
  188. break FOR_LOOP
  189. }
  190. case <-trySyncTicker.C: // chan time
  191. // This loop can be slow as long as it's doing syncing work.
  192. SYNC_LOOP:
  193. for i := 0; i < 10; i++ {
  194. // See if there are any blocks to sync.
  195. first, second := bcR.pool.PeekTwoBlocks()
  196. //bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
  197. if first == nil || second == nil {
  198. // We need both to sync the first block.
  199. break SYNC_LOOP
  200. }
  201. firstParts := first.MakePartSet(types.DefaultBlockPartSize)
  202. firstPartsHeader := firstParts.Header()
  203. // Finally, verify the first block using the second's commit
  204. // NOTE: we can probably make this more efficient, but note that calling
  205. // first.Hash() doesn't verify the tx contents, so MakePartSet() is
  206. // currently necessary.
  207. err := bcR.state.Validators.VerifyCommit(
  208. bcR.state.ChainID, types.BlockID{first.Hash(), firstPartsHeader}, first.Height, second.LastCommit)
  209. if err != nil {
  210. bcR.Logger.Info("error in validation", "err", err)
  211. bcR.pool.RedoRequest(first.Height)
  212. break SYNC_LOOP
  213. } else {
  214. bcR.pool.PopRequest()
  215. bcR.store.SaveBlock(first, firstParts, second.LastCommit)
  216. // TODO: should we be firing events? need to fire NewBlock events manually ...
  217. // NOTE: we could improve performance if we
  218. // didn't make the app commit to disk every block
  219. // ... but we would need a way to get the hash without it persisting
  220. err := bcR.state.ApplyBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader, types.MockMempool{})
  221. if err != nil {
  222. // TODO This is bad, are we zombie?
  223. cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
  224. }
  225. }
  226. }
  227. continue FOR_LOOP
  228. case <-bcR.Quit:
  229. break FOR_LOOP
  230. }
  231. }
  232. }
  233. // BroadcastStatusRequest broadcasts `BlockStore` height.
  234. func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
  235. bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.store.Height()}})
  236. return nil
  237. }
  238. // SetEventSwitch implements events.Eventable
  239. func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) {
  240. bcR.evsw = evsw
  241. }
  242. //-----------------------------------------------------------------------------
  243. // Messages
  244. const (
  245. msgTypeBlockRequest = byte(0x10)
  246. msgTypeBlockResponse = byte(0x11)
  247. msgTypeStatusResponse = byte(0x20)
  248. msgTypeStatusRequest = byte(0x21)
  249. )
  250. // BlockchainMessage is a generic message for this reactor.
  251. type BlockchainMessage interface{}
  252. var _ = wire.RegisterInterface(
  253. struct{ BlockchainMessage }{},
  254. wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
  255. wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
  256. wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
  257. wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
  258. )
  259. // DecodeMessage decodes BlockchainMessage.
  260. // TODO: ensure that bz is completely read.
  261. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
  262. msgType = bz[0]
  263. n := int(0)
  264. r := bytes.NewReader(bz)
  265. msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
  266. if err != nil && n != len(bz) {
  267. err = errors.New("DecodeMessage() had bytes left over")
  268. }
  269. return
  270. }
  271. //-------------------------------------
  272. type bcBlockRequestMessage struct {
  273. Height int
  274. }
  275. func (m *bcBlockRequestMessage) String() string {
  276. return cmn.Fmt("[bcBlockRequestMessage %v]", m.Height)
  277. }
  278. //-------------------------------------
  279. // NOTE: keep up-to-date with maxBlockchainResponseSize
  280. type bcBlockResponseMessage struct {
  281. Block *types.Block
  282. }
  283. func (m *bcBlockResponseMessage) String() string {
  284. return cmn.Fmt("[bcBlockResponseMessage %v]", m.Block.Height)
  285. }
  286. //-------------------------------------
  287. type bcStatusRequestMessage struct {
  288. Height int
  289. }
  290. func (m *bcStatusRequestMessage) String() string {
  291. return cmn.Fmt("[bcStatusRequestMessage %v]", m.Height)
  292. }
  293. //-------------------------------------
  294. type bcStatusResponseMessage struct {
  295. Height int
  296. }
  297. func (m *bcStatusResponseMessage) String() string {
  298. return cmn.Fmt("[bcStatusResponseMessage %v]", m.Height)
  299. }