You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

311 lines
8.3 KiB

  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "sync/atomic"
  7. "time"
  8. "github.com/tendermint/tendermint/binary"
  9. . "github.com/tendermint/tendermint/common"
  10. "github.com/tendermint/tendermint/events"
  11. "github.com/tendermint/tendermint/p2p"
  12. sm "github.com/tendermint/tendermint/state"
  13. "github.com/tendermint/tendermint/types"
  14. )
  15. const (
  16. BlockchainChannel = byte(0x40)
  17. defaultChannelCapacity = 100
  18. defaultSleepIntervalMS = 500
  19. trySyncIntervalMS = 100
  20. // stop syncing when last block's time is
  21. // within this much of the system time.
  22. stopSyncingDurationMinutes = 10
  23. )
  24. type stateResetter interface {
  25. ResetToState(*sm.State)
  26. }
  27. // BlockchainReactor handles long-term catchup syncing.
  28. type BlockchainReactor struct {
  29. sw *p2p.Switch
  30. state *sm.State
  31. store *BlockStore
  32. pool *BlockPool
  33. sync bool
  34. requestsCh chan BlockRequest
  35. timeoutsCh chan string
  36. lastBlock *types.Block
  37. quit chan struct{}
  38. running uint32
  39. evsw events.Fireable
  40. }
  41. func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor {
  42. if state.LastBlockHeight != store.Height() {
  43. panic(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  44. }
  45. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  46. timeoutsCh := make(chan string, defaultChannelCapacity)
  47. pool := NewBlockPool(
  48. store.Height()+1,
  49. requestsCh,
  50. timeoutsCh,
  51. )
  52. bcR := &BlockchainReactor{
  53. state: state,
  54. store: store,
  55. pool: pool,
  56. sync: sync,
  57. requestsCh: requestsCh,
  58. timeoutsCh: timeoutsCh,
  59. quit: make(chan struct{}),
  60. running: uint32(0),
  61. }
  62. return bcR
  63. }
  64. // Implements Reactor
  65. func (bcR *BlockchainReactor) Start(sw *p2p.Switch) {
  66. if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) {
  67. log.Info("Starting BlockchainReactor")
  68. bcR.sw = sw
  69. bcR.pool.Start()
  70. if bcR.sync {
  71. go bcR.poolRoutine()
  72. }
  73. }
  74. }
  75. // Implements Reactor
  76. func (bcR *BlockchainReactor) Stop() {
  77. if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) {
  78. log.Info("Stopping BlockchainReactor")
  79. close(bcR.quit)
  80. bcR.pool.Stop()
  81. }
  82. }
  83. // Implements Reactor
  84. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  85. return []*p2p.ChannelDescriptor{
  86. &p2p.ChannelDescriptor{
  87. Id: BlockchainChannel,
  88. Priority: 5,
  89. SendQueueCapacity: 100,
  90. },
  91. }
  92. }
  93. // Implements Reactor
  94. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
  95. // Send peer our state.
  96. peer.Send(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()})
  97. }
  98. // Implements Reactor
  99. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  100. // Remove peer from the pool.
  101. bcR.pool.RemovePeer(peer.Key)
  102. }
  103. // Implements Reactor
  104. func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
  105. _, msg_, err := DecodeMessage(msgBytes)
  106. if err != nil {
  107. log.Warn("Error decoding message", "error", err)
  108. return
  109. }
  110. log.Info("Received message", "msg", msg_)
  111. switch msg := msg_.(type) {
  112. case bcBlockRequestMessage:
  113. // Got a request for a block. Respond with block if we have it.
  114. block := bcR.store.LoadBlock(msg.Height)
  115. if block != nil {
  116. msg := bcBlockResponseMessage{Block: block}
  117. queued := src.TrySend(BlockchainChannel, msg)
  118. if !queued {
  119. // queue is full, just ignore.
  120. }
  121. } else {
  122. // TODO peer is asking for things we don't have.
  123. }
  124. case bcBlockResponseMessage:
  125. // Got a block.
  126. bcR.pool.AddBlock(msg.Block, src.Key)
  127. case bcPeerStatusMessage:
  128. // Got a peer status.
  129. bcR.pool.SetPeerHeight(src.Key, msg.Height)
  130. default:
  131. // Ignore unknown message
  132. }
  133. }
  134. // Handle messages from the poolReactor telling the reactor what to do.
  135. func (bcR *BlockchainReactor) poolRoutine() {
  136. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  137. FOR_LOOP:
  138. for {
  139. select {
  140. case request := <-bcR.requestsCh: // chan BlockRequest
  141. peer := bcR.sw.Peers().Get(request.PeerId)
  142. if peer == nil {
  143. // We can't fulfill the request.
  144. continue FOR_LOOP
  145. }
  146. msg := bcBlockRequestMessage{request.Height}
  147. queued := peer.TrySend(BlockchainChannel, msg)
  148. if !queued {
  149. // We couldn't queue the request.
  150. time.Sleep(defaultSleepIntervalMS * time.Millisecond)
  151. continue FOR_LOOP
  152. }
  153. case peerId := <-bcR.timeoutsCh: // chan string
  154. // Peer timed out.
  155. peer := bcR.sw.Peers().Get(peerId)
  156. if peer != nil {
  157. bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  158. }
  159. case _ = <-trySyncTicker.C: // chan time
  160. //var lastValidatedBlock *types.Block
  161. SYNC_LOOP:
  162. for i := 0; i < 10; i++ {
  163. // See if there are any blocks to sync.
  164. first, second := bcR.pool.PeekTwoBlocks()
  165. //log.Debug("TrySync peeked", "first", first, "second", second)
  166. if first == nil || second == nil {
  167. // We need both to sync the first block.
  168. break SYNC_LOOP
  169. }
  170. firstParts := first.MakePartSet()
  171. firstPartsHeader := firstParts.Header()
  172. // Finally, verify the first block using the second's validation.
  173. err := bcR.state.BondedValidators.VerifyValidation(
  174. first.Hash(), firstPartsHeader, first.Height, second.Validation)
  175. if err != nil {
  176. log.Debug("error in validation", "error", err)
  177. bcR.pool.RedoRequest(first.Height)
  178. break SYNC_LOOP
  179. } else {
  180. bcR.pool.PopRequest()
  181. err := sm.ExecBlock(bcR.state, first, firstPartsHeader)
  182. if err != nil {
  183. // TODO This is bad, are we zombie?
  184. panic(Fmt("Failed to process committed block: %v", err))
  185. }
  186. bcR.store.SaveBlock(first, firstParts, second.Validation)
  187. bcR.state.Save()
  188. //lastValidatedBlock = first
  189. }
  190. }
  191. /*
  192. // We're done syncing for now (will do again shortly)
  193. // See if we want to stop syncing and turn on the
  194. // consensus reactor.
  195. // TODO: use other heuristics too besides blocktime.
  196. // It's not a security concern, as it only needs to happen
  197. // upon node sync, and there's also a second (slower)
  198. // method of syncing in the consensus reactor.
  199. if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute {
  200. go func() {
  201. log.Info("Stopping blockpool syncing, turning on consensus...")
  202. trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others.
  203. conR := bcR.sw.Reactor("CONSENSUS")
  204. conR.(stateResetter).ResetToState(bcR.state)
  205. conR.Start(bcR.sw)
  206. for _, peer := range bcR.sw.Peers().List() {
  207. conR.AddPeer(peer)
  208. }
  209. }()
  210. break FOR_LOOP
  211. }
  212. */
  213. continue FOR_LOOP
  214. case <-bcR.quit:
  215. break FOR_LOOP
  216. }
  217. }
  218. }
  219. func (bcR *BlockchainReactor) BroadcastStatus() error {
  220. bcR.sw.Broadcast(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()})
  221. return nil
  222. }
  223. // implements events.Eventable
  224. func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) {
  225. bcR.evsw = evsw
  226. }
  227. //-----------------------------------------------------------------------------
  228. // Messages
  229. const (
  230. msgTypeUnknown = byte(0x00)
  231. msgTypeBlockRequest = byte(0x10)
  232. msgTypeBlockResponse = byte(0x11)
  233. msgTypePeerStatus = byte(0x20)
  234. )
  235. // TODO: check for unnecessary extra bytes at the end.
  236. func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
  237. n := new(int64)
  238. msgType = bz[0]
  239. r := bytes.NewReader(bz)
  240. switch msgType {
  241. case msgTypeBlockRequest:
  242. msg = binary.ReadBinary(bcBlockRequestMessage{}, r, n, &err)
  243. case msgTypeBlockResponse:
  244. msg = binary.ReadBinary(bcBlockResponseMessage{}, r, n, &err)
  245. case msgTypePeerStatus:
  246. msg = binary.ReadBinary(bcPeerStatusMessage{}, r, n, &err)
  247. default:
  248. msg = nil
  249. }
  250. return
  251. }
  252. //-------------------------------------
  253. type bcBlockRequestMessage struct {
  254. Height uint
  255. }
  256. func (m bcBlockRequestMessage) TypeByte() byte { return msgTypeBlockRequest }
  257. func (m bcBlockRequestMessage) String() string {
  258. return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
  259. }
  260. //-------------------------------------
  261. type bcBlockResponseMessage struct {
  262. Block *types.Block
  263. }
  264. func (m bcBlockResponseMessage) TypeByte() byte { return msgTypeBlockResponse }
  265. func (m bcBlockResponseMessage) String() string {
  266. return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
  267. }
  268. //-------------------------------------
  269. type bcPeerStatusMessage struct {
  270. Height uint
  271. }
  272. func (m bcPeerStatusMessage) TypeByte() byte { return msgTypePeerStatus }
  273. func (m bcPeerStatusMessage) String() string {
  274. return fmt.Sprintf("[bcPeerStatusMessage %v]", m.Height)
  275. }