You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

344 lines
11 KiB

8 years ago
8 years ago
10 years ago
8 years ago
8 years ago
10 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "reflect"
  6. "time"
  7. wire "github.com/tendermint/go-wire"
  8. "github.com/tendermint/tendermint/p2p"
  9. "github.com/tendermint/tendermint/proxy"
  10. sm "github.com/tendermint/tendermint/state"
  11. "github.com/tendermint/tendermint/types"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. )
  14. const (
  15. // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
  16. BlockchainChannel = byte(0x40)
  17. defaultChannelCapacity = 100
  18. defaultSleepIntervalMS = 500
  19. trySyncIntervalMS = 100
  20. // stop syncing when last block's time is
  21. // within this much of the system time.
  22. // stopSyncingDurationMinutes = 10
  23. // ask for best height every 10s
  24. statusUpdateIntervalSeconds = 10
  25. // check if we should switch to consensus reactor
  26. switchToConsensusIntervalSeconds = 1
  27. maxBlockchainResponseSize = types.MaxBlockSize + 2
  28. )
  29. type consensusReactor interface {
  30. // for when we switch from blockchain reactor and fast sync to
  31. // the consensus machine
  32. SwitchToConsensus(*sm.State)
  33. }
  34. // BlockchainReactor handles long-term catchup syncing.
  35. type BlockchainReactor struct {
  36. p2p.BaseReactor
  37. state *sm.State
  38. proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn
  39. store *BlockStore
  40. pool *BlockPool
  41. fastSync bool
  42. requestsCh chan BlockRequest
  43. timeoutsCh chan string
  44. lastBlock *types.Block
  45. evsw types.EventSwitch
  46. }
  47. // NewBlockchainReactor returns new reactor instance.
  48. func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor {
  49. if state.LastBlockHeight == store.Height()-1 {
  50. store.height-- // XXX HACK, make this better
  51. }
  52. if state.LastBlockHeight != store.Height() {
  53. cmn.PanicSanity(cmn.Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  54. }
  55. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  56. timeoutsCh := make(chan string, defaultChannelCapacity)
  57. pool := NewBlockPool(
  58. store.Height()+1,
  59. requestsCh,
  60. timeoutsCh,
  61. )
  62. bcR := &BlockchainReactor{
  63. state: state,
  64. proxyAppConn: proxyAppConn,
  65. store: store,
  66. pool: pool,
  67. fastSync: fastSync,
  68. requestsCh: requestsCh,
  69. timeoutsCh: timeoutsCh,
  70. }
  71. bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
  72. return bcR
  73. }
  74. // OnStart implements BaseService
  75. func (bcR *BlockchainReactor) OnStart() error {
  76. bcR.BaseReactor.OnStart()
  77. if bcR.fastSync {
  78. _, err := bcR.pool.Start()
  79. if err != nil {
  80. return err
  81. }
  82. go bcR.poolRoutine()
  83. }
  84. return nil
  85. }
  86. // OnStop implements BaseService
  87. func (bcR *BlockchainReactor) OnStop() {
  88. bcR.BaseReactor.OnStop()
  89. bcR.pool.Stop()
  90. }
  91. // GetChannels implements Reactor
  92. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  93. return []*p2p.ChannelDescriptor{
  94. &p2p.ChannelDescriptor{
  95. ID: BlockchainChannel,
  96. Priority: 5,
  97. SendQueueCapacity: 100,
  98. },
  99. }
  100. }
  101. // AddPeer implements Reactor by sending our state to peer.
  102. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
  103. if !peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}}) {
  104. // doing nothing, will try later in `poolRoutine`
  105. }
  106. }
  107. // RemovePeer implements Reactor by removing peer from the pool.
  108. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  109. bcR.pool.RemovePeer(peer.Key)
  110. }
  111. // Receive implements Reactor by handling 4 types of messages (look below).
  112. func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  113. _, msg, err := DecodeMessage(msgBytes)
  114. if err != nil {
  115. bcR.Logger.Error("Error decoding message", "error", err)
  116. return
  117. }
  118. bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
  119. switch msg := msg.(type) {
  120. case *bcBlockRequestMessage:
  121. // Got a request for a block. Respond with block if we have it.
  122. block := bcR.store.LoadBlock(msg.Height)
  123. if block != nil {
  124. msg := &bcBlockResponseMessage{Block: block}
  125. queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
  126. if !queued {
  127. // queue is full, just ignore.
  128. }
  129. } else {
  130. // TODO peer is asking for things we don't have.
  131. }
  132. case *bcBlockResponseMessage:
  133. // Got a block.
  134. bcR.pool.AddBlock(src.Key, msg.Block, len(msgBytes))
  135. case *bcStatusRequestMessage:
  136. // Send peer our state.
  137. queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  138. if !queued {
  139. // sorry
  140. }
  141. case *bcStatusResponseMessage:
  142. // Got a peer status. Unverified.
  143. bcR.pool.SetPeerHeight(src.Key, msg.Height)
  144. default:
  145. bcR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  146. }
  147. }
  148. // Handle messages from the poolReactor telling the reactor what to do.
  149. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
  150. // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
  151. func (bcR *BlockchainReactor) poolRoutine() {
  152. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  153. statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
  154. switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
  155. FOR_LOOP:
  156. for {
  157. select {
  158. case request := <-bcR.requestsCh: // chan BlockRequest
  159. peer := bcR.Switch.Peers().Get(request.PeerID)
  160. if peer == nil {
  161. continue FOR_LOOP // Peer has since been disconnected.
  162. }
  163. msg := &bcBlockRequestMessage{request.Height}
  164. queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
  165. if !queued {
  166. // We couldn't make the request, send-queue full.
  167. // The pool handles timeouts, just let it go.
  168. continue FOR_LOOP
  169. }
  170. case peerID := <-bcR.timeoutsCh: // chan string
  171. // Peer timed out.
  172. peer := bcR.Switch.Peers().Get(peerID)
  173. if peer != nil {
  174. bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  175. }
  176. case _ = <-statusUpdateTicker.C:
  177. // ask for status updates
  178. go bcR.BroadcastStatusRequest()
  179. case _ = <-switchToConsensusTicker.C:
  180. height, numPending, _ := bcR.pool.GetStatus()
  181. outbound, inbound, _ := bcR.Switch.NumPeers()
  182. bcR.Logger.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters),
  183. "outbound", outbound, "inbound", inbound)
  184. if bcR.pool.IsCaughtUp() {
  185. bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
  186. bcR.pool.Stop()
  187. conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
  188. conR.SwitchToConsensus(bcR.state)
  189. break FOR_LOOP
  190. }
  191. case _ = <-trySyncTicker.C: // chan time
  192. // This loop can be slow as long as it's doing syncing work.
  193. SYNC_LOOP:
  194. for i := 0; i < 10; i++ {
  195. // See if there are any blocks to sync.
  196. first, second := bcR.pool.PeekTwoBlocks()
  197. //bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
  198. if first == nil || second == nil {
  199. // We need both to sync the first block.
  200. break SYNC_LOOP
  201. }
  202. firstParts := first.MakePartSet(types.DefaultBlockPartSize)
  203. firstPartsHeader := firstParts.Header()
  204. // Finally, verify the first block using the second's commit
  205. // NOTE: we can probably make this more efficient, but note that calling
  206. // first.Hash() doesn't verify the tx contents, so MakePartSet() is
  207. // currently necessary.
  208. err := bcR.state.Validators.VerifyCommit(
  209. bcR.state.ChainID, types.BlockID{first.Hash(), firstPartsHeader}, first.Height, second.LastCommit)
  210. if err != nil {
  211. bcR.Logger.Info("error in validation", "error", err)
  212. bcR.pool.RedoRequest(first.Height)
  213. break SYNC_LOOP
  214. } else {
  215. bcR.pool.PopRequest()
  216. bcR.store.SaveBlock(first, firstParts, second.LastCommit)
  217. // TODO: should we be firing events? need to fire NewBlock events manually ...
  218. // NOTE: we could improve performance if we
  219. // didn't make the app commit to disk every block
  220. // ... but we would need a way to get the hash without it persisting
  221. err := bcR.state.ApplyBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader, types.MockMempool{})
  222. if err != nil {
  223. // TODO This is bad, are we zombie?
  224. cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
  225. }
  226. }
  227. }
  228. continue FOR_LOOP
  229. case <-bcR.Quit:
  230. break FOR_LOOP
  231. }
  232. }
  233. }
  234. // BroadcastStatusRequest broadcasts `BlockStore` height.
  235. func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
  236. bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.store.Height()}})
  237. return nil
  238. }
  239. // SetEventSwitch implements events.Eventable
  240. func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) {
  241. bcR.evsw = evsw
  242. }
  243. //-----------------------------------------------------------------------------
  244. // Messages
  245. const (
  246. msgTypeBlockRequest = byte(0x10)
  247. msgTypeBlockResponse = byte(0x11)
  248. msgTypeStatusResponse = byte(0x20)
  249. msgTypeStatusRequest = byte(0x21)
  250. )
  251. // BlockchainMessage is a generic message for this reactor.
  252. type BlockchainMessage interface{}
  253. var _ = wire.RegisterInterface(
  254. struct{ BlockchainMessage }{},
  255. wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
  256. wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
  257. wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
  258. wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
  259. )
  260. // DecodeMessage decodes BlockchainMessage.
  261. // TODO: ensure that bz is completely read.
  262. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
  263. msgType = bz[0]
  264. n := int(0)
  265. r := bytes.NewReader(bz)
  266. msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
  267. if err != nil && n != len(bz) {
  268. err = errors.New("DecodeMessage() had bytes left over")
  269. }
  270. return
  271. }
  272. //-------------------------------------
  273. type bcBlockRequestMessage struct {
  274. Height int
  275. }
  276. func (m *bcBlockRequestMessage) String() string {
  277. return cmn.Fmt("[bcBlockRequestMessage %v]", m.Height)
  278. }
  279. //-------------------------------------
  280. // NOTE: keep up-to-date with maxBlockchainResponseSize
  281. type bcBlockResponseMessage struct {
  282. Block *types.Block
  283. }
  284. func (m *bcBlockResponseMessage) String() string {
  285. return cmn.Fmt("[bcBlockResponseMessage %v]", m.Block.Height)
  286. }
  287. //-------------------------------------
  288. type bcStatusRequestMessage struct {
  289. Height int
  290. }
  291. func (m *bcStatusRequestMessage) String() string {
  292. return cmn.Fmt("[bcStatusRequestMessage %v]", m.Height)
  293. }
  294. //-------------------------------------
  295. type bcStatusResponseMessage struct {
  296. Height int
  297. }
  298. func (m *bcStatusResponseMessage) String() string {
  299. return cmn.Fmt("[bcStatusResponseMessage %v]", m.Height)
  300. }