You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

329 lines
9.2 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "time"
  8. . "github.com/tendermint/go-common"
  9. "github.com/tendermint/tendermint/events"
  10. "github.com/tendermint/go-p2p"
  11. sm "github.com/tendermint/tendermint/state"
  12. "github.com/tendermint/tendermint/types"
  13. "github.com/tendermint/go-wire"
  14. )
  15. const (
  16. BlockchainChannel = byte(0x40)
  17. defaultChannelCapacity = 100
  18. defaultSleepIntervalMS = 500
  19. trySyncIntervalMS = 100
  20. // stop syncing when last block's time is
  21. // within this much of the system time.
  22. // stopSyncingDurationMinutes = 10
  23. // ask for best height every 10s
  24. statusUpdateIntervalSeconds = 10
  25. // check if we should switch to consensus reactor
  26. switchToConsensusIntervalSeconds = 1
  27. )
  28. type consensusReactor interface {
  29. // for when we switch from blockchain reactor and fast sync to
  30. // the consensus machine
  31. SwitchToConsensus(*sm.State)
  32. }
  33. // BlockchainReactor handles long-term catchup syncing.
  34. type BlockchainReactor struct {
  35. p2p.BaseReactor
  36. sw *p2p.Switch
  37. state *sm.State
  38. store *BlockStore
  39. pool *BlockPool
  40. sync bool
  41. requestsCh chan BlockRequest
  42. timeoutsCh chan string
  43. lastBlock *types.Block
  44. evsw events.Fireable
  45. }
  46. func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor {
  47. if state.LastBlockHeight != store.Height() &&
  48. state.LastBlockHeight != store.Height()-1 { // XXX double check this logic.
  49. PanicSanity(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  50. }
  51. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  52. timeoutsCh := make(chan string, defaultChannelCapacity)
  53. pool := NewBlockPool(
  54. store.Height()+1,
  55. requestsCh,
  56. timeoutsCh,
  57. )
  58. bcR := &BlockchainReactor{
  59. state: state,
  60. store: store,
  61. pool: pool,
  62. sync: sync,
  63. requestsCh: requestsCh,
  64. timeoutsCh: timeoutsCh,
  65. }
  66. bcR.BaseReactor = *p2p.NewBaseReactor(log, "BlockchainReactor", bcR)
  67. return bcR
  68. }
  69. func (bcR *BlockchainReactor) OnStart() error {
  70. bcR.BaseReactor.OnStart()
  71. if bcR.sync {
  72. _, err := bcR.pool.Start()
  73. if err != nil {
  74. return err
  75. }
  76. go bcR.poolRoutine()
  77. }
  78. return nil
  79. }
  80. func (bcR *BlockchainReactor) OnStop() {
  81. bcR.BaseReactor.OnStop()
  82. bcR.pool.Stop()
  83. }
  84. // Implements Reactor
  85. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  86. return []*p2p.ChannelDescriptor{
  87. &p2p.ChannelDescriptor{
  88. ID: BlockchainChannel,
  89. Priority: 5,
  90. SendQueueCapacity: 100,
  91. },
  92. }
  93. }
  94. // Implements Reactor
  95. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
  96. // Send peer our state.
  97. peer.Send(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
  98. }
  99. // Implements Reactor
  100. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  101. // Remove peer from the pool.
  102. bcR.pool.RemovePeer(peer.Key)
  103. }
  104. // Implements Reactor
  105. func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  106. _, msg, err := DecodeMessage(msgBytes)
  107. if err != nil {
  108. log.Warn("Error decoding message", "error", err)
  109. return
  110. }
  111. log.Notice("Received message", "src", src, "chID", chID, "msg", msg)
  112. switch msg := msg.(type) {
  113. case *bcBlockRequestMessage:
  114. // Got a request for a block. Respond with block if we have it.
  115. block := bcR.store.LoadBlock(msg.Height)
  116. if block != nil {
  117. msg := &bcBlockResponseMessage{Block: block}
  118. queued := src.TrySend(BlockchainChannel, msg)
  119. if !queued {
  120. // queue is full, just ignore.
  121. }
  122. } else {
  123. // TODO peer is asking for things we don't have.
  124. }
  125. case *bcBlockResponseMessage:
  126. // Got a block.
  127. bcR.pool.AddBlock(src.Key, msg.Block, len(msgBytes))
  128. case *bcStatusRequestMessage:
  129. // Send peer our state.
  130. queued := src.TrySend(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
  131. if !queued {
  132. // sorry
  133. }
  134. case *bcStatusResponseMessage:
  135. // Got a peer status. Unverified.
  136. bcR.pool.SetPeerHeight(src.Key, msg.Height)
  137. default:
  138. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  139. }
  140. }
  141. // Handle messages from the poolReactor telling the reactor what to do.
  142. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
  143. // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
  144. func (bcR *BlockchainReactor) poolRoutine() {
  145. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  146. statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
  147. switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
  148. FOR_LOOP:
  149. for {
  150. select {
  151. case request := <-bcR.requestsCh: // chan BlockRequest
  152. peer := bcR.Switch.Peers().Get(request.PeerID)
  153. if peer == nil {
  154. continue FOR_LOOP // Peer has since been disconnected.
  155. }
  156. msg := &bcBlockRequestMessage{request.Height}
  157. queued := peer.TrySend(BlockchainChannel, msg)
  158. if !queued {
  159. // We couldn't make the request, send-queue full.
  160. // The pool handles timeouts, just let it go.
  161. continue FOR_LOOP
  162. }
  163. case peerID := <-bcR.timeoutsCh: // chan string
  164. // Peer timed out.
  165. peer := bcR.Switch.Peers().Get(peerID)
  166. if peer != nil {
  167. bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  168. }
  169. case _ = <-statusUpdateTicker.C:
  170. // ask for status updates
  171. go bcR.BroadcastStatusRequest()
  172. case _ = <-switchToConsensusTicker.C:
  173. height, numPending := bcR.pool.GetStatus()
  174. outbound, inbound, _ := bcR.Switch.NumPeers()
  175. log.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters),
  176. "outbound", outbound, "inbound", inbound)
  177. if bcR.pool.IsCaughtUp() {
  178. log.Notice("Time to switch to consensus reactor!", "height", height)
  179. bcR.pool.Stop()
  180. conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
  181. conR.SwitchToConsensus(bcR.state)
  182. break FOR_LOOP
  183. }
  184. case _ = <-trySyncTicker.C: // chan time
  185. // This loop can be slow as long as it's doing syncing work.
  186. SYNC_LOOP:
  187. for i := 0; i < 10; i++ {
  188. // See if there are any blocks to sync.
  189. first, second := bcR.pool.PeekTwoBlocks()
  190. //log.Info("TrySync peeked", "first", first, "second", second)
  191. if first == nil || second == nil {
  192. // We need both to sync the first block.
  193. break SYNC_LOOP
  194. }
  195. firstParts := first.MakePartSet()
  196. firstPartsHeader := firstParts.Header()
  197. // Finally, verify the first block using the second's validation.
  198. err := bcR.state.BondedValidators.VerifyValidation(
  199. bcR.state.ChainID, first.Hash(), firstPartsHeader, first.Height, second.LastValidation)
  200. if err != nil {
  201. log.Info("error in validation", "error", err)
  202. bcR.pool.RedoRequest(first.Height)
  203. break SYNC_LOOP
  204. } else {
  205. bcR.pool.PopRequest()
  206. err := sm.ExecBlock(bcR.state, first, firstPartsHeader)
  207. if err != nil {
  208. // TODO This is bad, are we zombie?
  209. PanicQ(Fmt("Failed to process committed block: %v", err))
  210. }
  211. bcR.store.SaveBlock(first, firstParts, second.LastValidation)
  212. bcR.state.Save()
  213. }
  214. }
  215. continue FOR_LOOP
  216. case <-bcR.Quit:
  217. break FOR_LOOP
  218. }
  219. }
  220. }
  221. func (bcR *BlockchainReactor) BroadcastStatusResponse() error {
  222. bcR.Switch.Broadcast(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
  223. return nil
  224. }
  225. func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
  226. bcR.Switch.Broadcast(BlockchainChannel, &bcStatusRequestMessage{bcR.store.Height()})
  227. return nil
  228. }
  229. // implements events.Eventable
  230. func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) {
  231. bcR.evsw = evsw
  232. }
  233. //-----------------------------------------------------------------------------
  234. // Messages
  235. const (
  236. msgTypeBlockRequest = byte(0x10)
  237. msgTypeBlockResponse = byte(0x11)
  238. msgTypeStatusResponse = byte(0x20)
  239. msgTypeStatusRequest = byte(0x21)
  240. )
  241. type BlockchainMessage interface{}
  242. var _ = wire.RegisterInterface(
  243. struct{ BlockchainMessage }{},
  244. wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
  245. wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
  246. wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
  247. wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
  248. )
  249. // TODO: ensure that bz is completely read.
  250. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
  251. msgType = bz[0]
  252. n := int64(0)
  253. r := bytes.NewReader(bz)
  254. msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
  255. if err != nil && n != int64(len(bz)) {
  256. err = errors.New("DecodeMessage() had bytes left over.")
  257. }
  258. return
  259. }
  260. //-------------------------------------
  261. type bcBlockRequestMessage struct {
  262. Height int
  263. }
  264. func (m *bcBlockRequestMessage) String() string {
  265. return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
  266. }
  267. //-------------------------------------
  268. type bcBlockResponseMessage struct {
  269. Block *types.Block
  270. }
  271. func (m *bcBlockResponseMessage) String() string {
  272. return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
  273. }
  274. //-------------------------------------
  275. type bcStatusRequestMessage struct {
  276. Height int
  277. }
  278. func (m *bcStatusRequestMessage) String() string {
  279. return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
  280. }
  281. //-------------------------------------
  282. type bcStatusResponseMessage struct {
  283. Height int
  284. }
  285. func (m *bcStatusResponseMessage) String() string {
  286. return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
  287. }