You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

343 lines
9.9 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "time"
  8. . "github.com/tendermint/go-common"
  9. "github.com/tendermint/go-p2p"
  10. "github.com/tendermint/go-wire"
  11. "github.com/tendermint/go-events"
  12. "github.com/tendermint/tendermint/proxy"
  13. sm "github.com/tendermint/tendermint/state"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. const (
  17. BlockchainChannel = byte(0x40)
  18. defaultChannelCapacity = 100
  19. defaultSleepIntervalMS = 500
  20. trySyncIntervalMS = 100
  21. // stop syncing when last block's time is
  22. // within this much of the system time.
  23. // stopSyncingDurationMinutes = 10
  24. // ask for best height every 10s
  25. statusUpdateIntervalSeconds = 10
  26. // check if we should switch to consensus reactor
  27. switchToConsensusIntervalSeconds = 1
  28. maxBlockchainResponseSize = types.MaxBlockSize + 2
  29. )
  30. type consensusReactor interface {
  31. // for when we switch from blockchain reactor and fast sync to
  32. // the consensus machine
  33. SwitchToConsensus(*sm.State)
  34. }
  35. // BlockchainReactor handles long-term catchup syncing.
  36. type BlockchainReactor struct {
  37. p2p.BaseReactor
  38. sw *p2p.Switch
  39. state *sm.State
  40. proxyAppConn proxy.AppConn // same as consensus.proxyAppConn
  41. store *BlockStore
  42. pool *BlockPool
  43. sync bool
  44. requestsCh chan BlockRequest
  45. timeoutsCh chan string
  46. lastBlock *types.Block
  47. evsw *events.EventSwitch
  48. }
  49. func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConn, store *BlockStore, sync bool) *BlockchainReactor {
  50. if state.LastBlockHeight == store.Height()-1 {
  51. store.height -= 1 // XXX HACK, make this better
  52. }
  53. if state.LastBlockHeight != store.Height() {
  54. PanicSanity(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  55. }
  56. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  57. timeoutsCh := make(chan string, defaultChannelCapacity)
  58. pool := NewBlockPool(
  59. store.Height()+1,
  60. requestsCh,
  61. timeoutsCh,
  62. )
  63. bcR := &BlockchainReactor{
  64. state: state,
  65. proxyAppConn: proxyAppConn,
  66. store: store,
  67. pool: pool,
  68. sync: sync,
  69. requestsCh: requestsCh,
  70. timeoutsCh: timeoutsCh,
  71. }
  72. bcR.BaseReactor = *p2p.NewBaseReactor(log, "BlockchainReactor", bcR)
  73. return bcR
  74. }
  75. func (bcR *BlockchainReactor) OnStart() error {
  76. bcR.BaseReactor.OnStart()
  77. if bcR.sync {
  78. _, err := bcR.pool.Start()
  79. if err != nil {
  80. return err
  81. }
  82. go bcR.poolRoutine()
  83. }
  84. return nil
  85. }
  86. func (bcR *BlockchainReactor) OnStop() {
  87. bcR.BaseReactor.OnStop()
  88. bcR.pool.Stop()
  89. }
  90. // Implements Reactor
  91. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  92. return []*p2p.ChannelDescriptor{
  93. &p2p.ChannelDescriptor{
  94. ID: BlockchainChannel,
  95. Priority: 5,
  96. SendQueueCapacity: 100,
  97. },
  98. }
  99. }
  100. // Implements Reactor
  101. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
  102. // Send peer our state.
  103. peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  104. }
  105. // Implements Reactor
  106. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  107. // Remove peer from the pool.
  108. bcR.pool.RemovePeer(peer.Key)
  109. }
  110. // Implements Reactor
  111. func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  112. _, msg, err := DecodeMessage(msgBytes)
  113. if err != nil {
  114. log.Warn("Error decoding message", "error", err)
  115. return
  116. }
  117. log.Notice("Receive", "src", src, "chID", chID, "msg", msg)
  118. switch msg := msg.(type) {
  119. case *bcBlockRequestMessage:
  120. // Got a request for a block. Respond with block if we have it.
  121. block := bcR.store.LoadBlock(msg.Height)
  122. if block != nil {
  123. msg := &bcBlockResponseMessage{Block: block}
  124. queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
  125. if !queued {
  126. // queue is full, just ignore.
  127. }
  128. } else {
  129. // TODO peer is asking for things we don't have.
  130. }
  131. case *bcBlockResponseMessage:
  132. // Got a block.
  133. bcR.pool.AddBlock(src.Key, msg.Block, len(msgBytes))
  134. case *bcStatusRequestMessage:
  135. // Send peer our state.
  136. queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  137. if !queued {
  138. // sorry
  139. }
  140. case *bcStatusResponseMessage:
  141. // Got a peer status. Unverified.
  142. bcR.pool.SetPeerHeight(src.Key, msg.Height)
  143. default:
  144. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  145. }
  146. }
  147. // Handle messages from the poolReactor telling the reactor what to do.
  148. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
  149. // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
  150. func (bcR *BlockchainReactor) poolRoutine() {
  151. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  152. statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
  153. switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
  154. FOR_LOOP:
  155. for {
  156. select {
  157. case request := <-bcR.requestsCh: // chan BlockRequest
  158. peer := bcR.Switch.Peers().Get(request.PeerID)
  159. if peer == nil {
  160. continue FOR_LOOP // Peer has since been disconnected.
  161. }
  162. msg := &bcBlockRequestMessage{request.Height}
  163. queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
  164. if !queued {
  165. // We couldn't make the request, send-queue full.
  166. // The pool handles timeouts, just let it go.
  167. continue FOR_LOOP
  168. }
  169. case peerID := <-bcR.timeoutsCh: // chan string
  170. // Peer timed out.
  171. peer := bcR.Switch.Peers().Get(peerID)
  172. if peer != nil {
  173. bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  174. }
  175. case _ = <-statusUpdateTicker.C:
  176. // ask for status updates
  177. go bcR.BroadcastStatusRequest()
  178. case _ = <-switchToConsensusTicker.C:
  179. height, numPending := bcR.pool.GetStatus()
  180. outbound, inbound, _ := bcR.Switch.NumPeers()
  181. log.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters),
  182. "outbound", outbound, "inbound", inbound)
  183. if bcR.pool.IsCaughtUp() {
  184. log.Notice("Time to switch to consensus reactor!", "height", height)
  185. bcR.pool.Stop()
  186. conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
  187. conR.SwitchToConsensus(bcR.state)
  188. break FOR_LOOP
  189. }
  190. case _ = <-trySyncTicker.C: // chan time
  191. // This loop can be slow as long as it's doing syncing work.
  192. SYNC_LOOP:
  193. for i := 0; i < 10; i++ {
  194. // See if there are any blocks to sync.
  195. first, second := bcR.pool.PeekTwoBlocks()
  196. //log.Info("TrySync peeked", "first", first, "second", second)
  197. if first == nil || second == nil {
  198. // We need both to sync the first block.
  199. break SYNC_LOOP
  200. }
  201. firstParts := first.MakePartSet()
  202. firstPartsHeader := firstParts.Header()
  203. // Finally, verify the first block using the second's validation.
  204. err := bcR.state.Validators.VerifyValidation(
  205. bcR.state.ChainID, first.Hash(), firstPartsHeader, first.Height, second.LastValidation)
  206. if err != nil {
  207. log.Info("error in validation", "error", err)
  208. bcR.pool.RedoRequest(first.Height)
  209. break SYNC_LOOP
  210. } else {
  211. bcR.pool.PopRequest()
  212. err := bcR.state.ExecBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader)
  213. if err != nil {
  214. // TODO This is bad, are we zombie?
  215. PanicQ(Fmt("Failed to process committed block: %v", err))
  216. }
  217. /*
  218. err = bcR.proxyAppConn.CommitSync()
  219. if err != nil {
  220. // TODO Handle gracefully.
  221. PanicQ(Fmt("Failed to commit block at application: %v", err))
  222. }
  223. */
  224. bcR.store.SaveBlock(first, firstParts, second.LastValidation)
  225. bcR.state.Save()
  226. }
  227. }
  228. continue FOR_LOOP
  229. case <-bcR.Quit:
  230. break FOR_LOOP
  231. }
  232. }
  233. }
  234. func (bcR *BlockchainReactor) BroadcastStatusResponse() error {
  235. bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  236. return nil
  237. }
  238. func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
  239. bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.store.Height()}})
  240. return nil
  241. }
  242. // implements events.Eventable
  243. func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) {
  244. bcR.evsw = evsw
  245. }
  246. //-----------------------------------------------------------------------------
  247. // Messages
  248. const (
  249. msgTypeBlockRequest = byte(0x10)
  250. msgTypeBlockResponse = byte(0x11)
  251. msgTypeStatusResponse = byte(0x20)
  252. msgTypeStatusRequest = byte(0x21)
  253. )
  254. type BlockchainMessage interface{}
  255. var _ = wire.RegisterInterface(
  256. struct{ BlockchainMessage }{},
  257. wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
  258. wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
  259. wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
  260. wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
  261. )
  262. // TODO: ensure that bz is completely read.
  263. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
  264. msgType = bz[0]
  265. n := int(0)
  266. r := bytes.NewReader(bz)
  267. msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
  268. if err != nil && n != len(bz) {
  269. err = errors.New("DecodeMessage() had bytes left over.")
  270. }
  271. return
  272. }
  273. //-------------------------------------
  274. type bcBlockRequestMessage struct {
  275. Height int
  276. }
  277. func (m *bcBlockRequestMessage) String() string {
  278. return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
  279. }
  280. //-------------------------------------
  281. // NOTE: keep up-to-date with maxBlockchainResponseSize
  282. type bcBlockResponseMessage struct {
  283. Block *types.Block
  284. }
  285. func (m *bcBlockResponseMessage) String() string {
  286. return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
  287. }
  288. //-------------------------------------
  289. type bcStatusRequestMessage struct {
  290. Height int
  291. }
  292. func (m *bcStatusRequestMessage) String() string {
  293. return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
  294. }
  295. //-------------------------------------
  296. type bcStatusResponseMessage struct {
  297. Height int
  298. }
  299. func (m *bcStatusResponseMessage) String() string {
  300. return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
  301. }