You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

348 lines
10 KiB

10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "time"
  8. . "github.com/tendermint/go-common"
  9. "github.com/tendermint/go-p2p"
  10. "github.com/tendermint/go-wire"
  11. "github.com/tendermint/tendermint/proxy"
  12. sm "github.com/tendermint/tendermint/state"
  13. "github.com/tendermint/tendermint/types"
  14. )
  15. const (
  16. BlockchainChannel = byte(0x40)
  17. defaultChannelCapacity = 100
  18. defaultSleepIntervalMS = 500
  19. trySyncIntervalMS = 100
  20. // stop syncing when last block's time is
  21. // within this much of the system time.
  22. // stopSyncingDurationMinutes = 10
  23. // ask for best height every 10s
  24. statusUpdateIntervalSeconds = 10
  25. // check if we should switch to consensus reactor
  26. switchToConsensusIntervalSeconds = 1
  27. maxBlockchainResponseSize = types.MaxBlockSize + 2
  28. )
  29. type consensusReactor interface {
  30. // for when we switch from blockchain reactor and fast sync to
  31. // the consensus machine
  32. SwitchToConsensus(*sm.State)
  33. }
  34. // BlockchainReactor handles long-term catchup syncing.
  35. type BlockchainReactor struct {
  36. p2p.BaseReactor
  37. sw *p2p.Switch
  38. state *sm.State
  39. proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn
  40. store *BlockStore
  41. pool *BlockPool
  42. fastSync bool
  43. requestsCh chan BlockRequest
  44. timeoutsCh chan string
  45. lastBlock *types.Block
  46. evsw types.EventSwitch
  47. }
  48. func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor {
  49. if state.LastBlockHeight == store.Height()-1 {
  50. store.height -= 1 // XXX HACK, make this better
  51. }
  52. if state.LastBlockHeight != store.Height() {
  53. PanicSanity(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  54. }
  55. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  56. timeoutsCh := make(chan string, defaultChannelCapacity)
  57. pool := NewBlockPool(
  58. store.Height()+1,
  59. requestsCh,
  60. timeoutsCh,
  61. )
  62. bcR := &BlockchainReactor{
  63. state: state,
  64. proxyAppConn: proxyAppConn,
  65. store: store,
  66. pool: pool,
  67. fastSync: fastSync,
  68. requestsCh: requestsCh,
  69. timeoutsCh: timeoutsCh,
  70. }
  71. bcR.BaseReactor = *p2p.NewBaseReactor(log, "BlockchainReactor", bcR)
  72. return bcR
  73. }
  74. func (bcR *BlockchainReactor) OnStart() error {
  75. bcR.BaseReactor.OnStart()
  76. if bcR.fastSync {
  77. _, err := bcR.pool.Start()
  78. if err != nil {
  79. return err
  80. }
  81. go bcR.poolRoutine()
  82. }
  83. return nil
  84. }
  85. func (bcR *BlockchainReactor) OnStop() {
  86. bcR.BaseReactor.OnStop()
  87. bcR.pool.Stop()
  88. }
  89. // Implements Reactor
  90. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  91. return []*p2p.ChannelDescriptor{
  92. &p2p.ChannelDescriptor{
  93. ID: BlockchainChannel,
  94. Priority: 5,
  95. SendQueueCapacity: 100,
  96. },
  97. }
  98. }
  99. // Implements Reactor
  100. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
  101. // Send peer our state.
  102. peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  103. }
  104. // Implements Reactor
  105. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  106. // Remove peer from the pool.
  107. bcR.pool.RemovePeer(peer.Key)
  108. }
  109. // Implements Reactor
  110. func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  111. _, msg, err := DecodeMessage(msgBytes)
  112. if err != nil {
  113. log.Warn("Error decoding message", "error", err)
  114. return
  115. }
  116. log.Debug("Receive", "src", src, "chID", chID, "msg", msg)
  117. switch msg := msg.(type) {
  118. case *bcBlockRequestMessage:
  119. // Got a request for a block. Respond with block if we have it.
  120. block := bcR.store.LoadBlock(msg.Height)
  121. if block != nil {
  122. msg := &bcBlockResponseMessage{Block: block}
  123. queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
  124. if !queued {
  125. // queue is full, just ignore.
  126. }
  127. } else {
  128. // TODO peer is asking for things we don't have.
  129. }
  130. case *bcBlockResponseMessage:
  131. // Got a block.
  132. bcR.pool.AddBlock(src.Key, msg.Block, len(msgBytes))
  133. case *bcStatusRequestMessage:
  134. // Send peer our state.
  135. queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  136. if !queued {
  137. // sorry
  138. }
  139. case *bcStatusResponseMessage:
  140. // Got a peer status. Unverified.
  141. bcR.pool.SetPeerHeight(src.Key, msg.Height)
  142. default:
  143. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  144. }
  145. }
  146. // Handle messages from the poolReactor telling the reactor what to do.
  147. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
  148. // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
  149. func (bcR *BlockchainReactor) poolRoutine() {
  150. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  151. statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
  152. switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
  153. FOR_LOOP:
  154. for {
  155. select {
  156. case request := <-bcR.requestsCh: // chan BlockRequest
  157. peer := bcR.Switch.Peers().Get(request.PeerID)
  158. if peer == nil {
  159. continue FOR_LOOP // Peer has since been disconnected.
  160. }
  161. msg := &bcBlockRequestMessage{request.Height}
  162. queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
  163. if !queued {
  164. // We couldn't make the request, send-queue full.
  165. // The pool handles timeouts, just let it go.
  166. continue FOR_LOOP
  167. }
  168. case peerID := <-bcR.timeoutsCh: // chan string
  169. // Peer timed out.
  170. peer := bcR.Switch.Peers().Get(peerID)
  171. if peer != nil {
  172. bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  173. }
  174. case _ = <-statusUpdateTicker.C:
  175. // ask for status updates
  176. go bcR.BroadcastStatusRequest()
  177. case _ = <-switchToConsensusTicker.C:
  178. height, numPending, _ := bcR.pool.GetStatus()
  179. outbound, inbound, _ := bcR.Switch.NumPeers()
  180. log.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters),
  181. "outbound", outbound, "inbound", inbound)
  182. if bcR.pool.IsCaughtUp() {
  183. log.Notice("Time to switch to consensus reactor!", "height", height)
  184. bcR.pool.Stop()
  185. conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
  186. conR.SwitchToConsensus(bcR.state)
  187. break FOR_LOOP
  188. }
  189. case _ = <-trySyncTicker.C: // chan time
  190. // This loop can be slow as long as it's doing syncing work.
  191. SYNC_LOOP:
  192. for i := 0; i < 10; i++ {
  193. // See if there are any blocks to sync.
  194. first, second := bcR.pool.PeekTwoBlocks()
  195. //log.Info("TrySync peeked", "first", first, "second", second)
  196. if first == nil || second == nil {
  197. // We need both to sync the first block.
  198. break SYNC_LOOP
  199. }
  200. firstParts := first.MakePartSet()
  201. firstPartsHeader := firstParts.Header()
  202. // Finally, verify the first block using the second's commit
  203. // NOTE: we can probably make this more efficient, but note that calling
  204. // first.Hash() doesn't verify the tx contents, so MakePartSet() is
  205. // currently necessary.
  206. err := bcR.state.Validators.VerifyCommit(
  207. bcR.state.ChainID, types.BlockID{first.Hash(), firstPartsHeader}, first.Height, second.LastCommit)
  208. if err != nil {
  209. log.Info("error in validation", "error", err)
  210. bcR.pool.RedoRequest(first.Height)
  211. break SYNC_LOOP
  212. } else {
  213. bcR.pool.PopRequest()
  214. // TODO: use ApplyBlock instead of Exec/Commit/SetAppHash/Save
  215. err := bcR.state.ExecBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader)
  216. if err != nil {
  217. // TODO This is bad, are we zombie?
  218. PanicQ(Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
  219. }
  220. // NOTE: we could improve performance if we
  221. // didn't make the app commit to disk every block
  222. // ... but we would need a way to get the hash without it persisting
  223. res := bcR.proxyAppConn.CommitSync()
  224. if res.IsErr() {
  225. // TODO Handle gracefully.
  226. PanicQ(Fmt("Failed to commit block at application: %v", res))
  227. }
  228. bcR.store.SaveBlock(first, firstParts, second.LastCommit)
  229. bcR.state.AppHash = res.Data
  230. bcR.state.Save()
  231. }
  232. }
  233. continue FOR_LOOP
  234. case <-bcR.Quit:
  235. break FOR_LOOP
  236. }
  237. }
  238. }
  239. func (bcR *BlockchainReactor) BroadcastStatusResponse() error {
  240. bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  241. return nil
  242. }
  243. func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
  244. bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.store.Height()}})
  245. return nil
  246. }
  247. // implements events.Eventable
  248. func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) {
  249. bcR.evsw = evsw
  250. }
  251. //-----------------------------------------------------------------------------
  252. // Messages
  253. const (
  254. msgTypeBlockRequest = byte(0x10)
  255. msgTypeBlockResponse = byte(0x11)
  256. msgTypeStatusResponse = byte(0x20)
  257. msgTypeStatusRequest = byte(0x21)
  258. )
  259. type BlockchainMessage interface{}
  260. var _ = wire.RegisterInterface(
  261. struct{ BlockchainMessage }{},
  262. wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
  263. wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
  264. wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
  265. wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
  266. )
  267. // TODO: ensure that bz is completely read.
  268. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
  269. msgType = bz[0]
  270. n := int(0)
  271. r := bytes.NewReader(bz)
  272. msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
  273. if err != nil && n != len(bz) {
  274. err = errors.New("DecodeMessage() had bytes left over.")
  275. }
  276. return
  277. }
  278. //-------------------------------------
  279. type bcBlockRequestMessage struct {
  280. Height int
  281. }
  282. func (m *bcBlockRequestMessage) String() string {
  283. return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
  284. }
  285. //-------------------------------------
  286. // NOTE: keep up-to-date with maxBlockchainResponseSize
  287. type bcBlockResponseMessage struct {
  288. Block *types.Block
  289. }
  290. func (m *bcBlockResponseMessage) String() string {
  291. return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
  292. }
  293. //-------------------------------------
  294. type bcStatusRequestMessage struct {
  295. Height int
  296. }
  297. func (m *bcStatusRequestMessage) String() string {
  298. return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
  299. }
  300. //-------------------------------------
  301. type bcStatusResponseMessage struct {
  302. Height int
  303. }
  304. func (m *bcStatusResponseMessage) String() string {
  305. return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
  306. }