You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

304 lines
8.1 KiB

10 years ago
  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "sync/atomic"
  7. "time"
  8. "github.com/tendermint/tendermint2/binary"
  9. . "github.com/tendermint/tendermint2/common"
  10. "github.com/tendermint/tendermint2/p2p"
  11. sm "github.com/tendermint/tendermint2/state"
  12. "github.com/tendermint/tendermint2/types"
  13. )
  14. const (
  15. BlockchainChannel = byte(0x40)
  16. defaultChannelCapacity = 100
  17. defaultSleepIntervalMS = 500
  18. trySyncIntervalMS = 100
  19. // stop syncing when last block's time is
  20. // within this much of the system time.
  21. stopSyncingDurationMinutes = 10
  22. )
  23. type stateResetter interface {
  24. ResetToState(*sm.State)
  25. }
  26. // BlockchainReactor handles long-term catchup syncing.
  27. type BlockchainReactor struct {
  28. sw *p2p.Switch
  29. state *sm.State
  30. store *BlockStore
  31. pool *BlockPool
  32. sync bool
  33. requestsCh chan BlockRequest
  34. timeoutsCh chan string
  35. lastBlock *types.Block
  36. quit chan struct{}
  37. running uint32
  38. }
  39. func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor {
  40. if state.LastBlockHeight != store.Height() {
  41. panic(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  42. }
  43. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  44. timeoutsCh := make(chan string, defaultChannelCapacity)
  45. pool := NewBlockPool(
  46. store.Height()+1,
  47. requestsCh,
  48. timeoutsCh,
  49. )
  50. bcR := &BlockchainReactor{
  51. state: state,
  52. store: store,
  53. pool: pool,
  54. sync: sync,
  55. requestsCh: requestsCh,
  56. timeoutsCh: timeoutsCh,
  57. quit: make(chan struct{}),
  58. running: uint32(0),
  59. }
  60. return bcR
  61. }
  62. // Implements Reactor
  63. func (bcR *BlockchainReactor) Start(sw *p2p.Switch) {
  64. if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) {
  65. log.Info("Starting BlockchainReactor")
  66. bcR.sw = sw
  67. bcR.pool.Start()
  68. if bcR.sync {
  69. go bcR.poolRoutine()
  70. }
  71. }
  72. }
  73. // Implements Reactor
  74. func (bcR *BlockchainReactor) Stop() {
  75. if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) {
  76. log.Info("Stopping BlockchainReactor")
  77. close(bcR.quit)
  78. bcR.pool.Stop()
  79. }
  80. }
  81. // Implements Reactor
  82. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  83. return []*p2p.ChannelDescriptor{
  84. &p2p.ChannelDescriptor{
  85. Id: BlockchainChannel,
  86. Priority: 5,
  87. SendQueueCapacity: 100,
  88. },
  89. }
  90. }
  91. // Implements Reactor
  92. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
  93. // Send peer our state.
  94. peer.Send(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()})
  95. }
  96. // Implements Reactor
  97. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  98. // Remove peer from the pool.
  99. bcR.pool.RemovePeer(peer.Key)
  100. }
  101. // Implements Reactor
  102. func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
  103. _, msg_, err := DecodeMessage(msgBytes)
  104. if err != nil {
  105. log.Warn("Error decoding message", "error", err)
  106. return
  107. }
  108. log.Info("Received message", "msg", msg_)
  109. switch msg := msg_.(type) {
  110. case bcBlockRequestMessage:
  111. // Got a request for a block. Respond with block if we have it.
  112. block := bcR.store.LoadBlock(msg.Height)
  113. if block != nil {
  114. msg := bcBlockResponseMessage{Block: block}
  115. queued := src.TrySend(BlockchainChannel, msg)
  116. if !queued {
  117. // queue is full, just ignore.
  118. }
  119. } else {
  120. // TODO peer is asking for things we don't have.
  121. }
  122. case bcBlockResponseMessage:
  123. // Got a block.
  124. bcR.pool.AddBlock(msg.Block, src.Key)
  125. case bcPeerStatusMessage:
  126. // Got a peer status.
  127. bcR.pool.SetPeerHeight(src.Key, msg.Height)
  128. default:
  129. // Ignore unknown message
  130. }
  131. }
  132. // Handle messages from the poolReactor telling the reactor what to do.
  133. func (bcR *BlockchainReactor) poolRoutine() {
  134. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  135. FOR_LOOP:
  136. for {
  137. select {
  138. case request := <-bcR.requestsCh: // chan BlockRequest
  139. peer := bcR.sw.Peers().Get(request.PeerId)
  140. if peer == nil {
  141. // We can't fulfill the request.
  142. continue FOR_LOOP
  143. }
  144. msg := bcBlockRequestMessage{request.Height}
  145. queued := peer.TrySend(BlockchainChannel, msg)
  146. if !queued {
  147. // We couldn't queue the request.
  148. time.Sleep(defaultSleepIntervalMS * time.Millisecond)
  149. continue FOR_LOOP
  150. }
  151. case peerId := <-bcR.timeoutsCh: // chan string
  152. // Peer timed out.
  153. peer := bcR.sw.Peers().Get(peerId)
  154. if peer != nil {
  155. bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  156. }
  157. case _ = <-trySyncTicker.C: // chan time
  158. //var lastValidatedBlock *types.Block
  159. SYNC_LOOP:
  160. for i := 0; i < 10; i++ {
  161. // See if there are any blocks to sync.
  162. first, second := bcR.pool.PeekTwoBlocks()
  163. //log.Debug("TrySync peeked", "first", first, "second", second)
  164. if first == nil || second == nil {
  165. // We need both to sync the first block.
  166. break SYNC_LOOP
  167. }
  168. firstParts := first.MakePartSet()
  169. firstPartsHeader := firstParts.Header()
  170. // Finally, verify the first block using the second's validation.
  171. err := bcR.state.BondedValidators.VerifyValidation(
  172. first.Hash(), firstPartsHeader, first.Height, second.Validation)
  173. if err != nil {
  174. log.Debug("error in validation", "error", err)
  175. bcR.pool.RedoRequest(first.Height)
  176. break SYNC_LOOP
  177. } else {
  178. bcR.pool.PopRequest()
  179. err := sm.ExecBlock(bcR.state, first, firstPartsHeader)
  180. if err != nil {
  181. // TODO This is bad, are we zombie?
  182. panic(Fmt("Failed to process committed block: %v", err))
  183. }
  184. bcR.store.SaveBlock(first, firstParts, second.Validation)
  185. bcR.state.Save()
  186. //lastValidatedBlock = first
  187. }
  188. }
  189. /*
  190. // We're done syncing for now (will do again shortly)
  191. // See if we want to stop syncing and turn on the
  192. // consensus reactor.
  193. // TODO: use other heuristics too besides blocktime.
  194. // It's not a security concern, as it only needs to happen
  195. // upon node sync, and there's also a second (slower)
  196. // method of syncing in the consensus reactor.
  197. if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute {
  198. go func() {
  199. log.Info("Stopping blockpool syncing, turning on consensus...")
  200. trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others.
  201. conR := bcR.sw.Reactor("CONSENSUS")
  202. conR.(stateResetter).ResetToState(bcR.state)
  203. conR.Start(bcR.sw)
  204. for _, peer := range bcR.sw.Peers().List() {
  205. conR.AddPeer(peer)
  206. }
  207. }()
  208. break FOR_LOOP
  209. }
  210. */
  211. continue FOR_LOOP
  212. case <-bcR.quit:
  213. break FOR_LOOP
  214. }
  215. }
  216. }
  217. func (bcR *BlockchainReactor) BroadcastStatus() error {
  218. bcR.sw.Broadcast(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()})
  219. return nil
  220. }
  221. //-----------------------------------------------------------------------------
  222. // Messages
  223. const (
  224. msgTypeUnknown = byte(0x00)
  225. msgTypeBlockRequest = byte(0x10)
  226. msgTypeBlockResponse = byte(0x11)
  227. msgTypePeerStatus = byte(0x20)
  228. )
  229. // TODO: check for unnecessary extra bytes at the end.
  230. func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
  231. n := new(int64)
  232. msgType = bz[0]
  233. r := bytes.NewReader(bz)
  234. switch msgType {
  235. case msgTypeBlockRequest:
  236. msg = binary.ReadBinary(bcBlockRequestMessage{}, r, n, &err)
  237. case msgTypeBlockResponse:
  238. msg = binary.ReadBinary(bcBlockResponseMessage{}, r, n, &err)
  239. case msgTypePeerStatus:
  240. msg = binary.ReadBinary(bcPeerStatusMessage{}, r, n, &err)
  241. default:
  242. msg = nil
  243. }
  244. return
  245. }
  246. //-------------------------------------
  247. type bcBlockRequestMessage struct {
  248. Height uint
  249. }
  250. func (m bcBlockRequestMessage) TypeByte() byte { return msgTypeBlockRequest }
  251. func (m bcBlockRequestMessage) String() string {
  252. return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
  253. }
  254. //-------------------------------------
  255. type bcBlockResponseMessage struct {
  256. Block *types.Block
  257. }
  258. func (m bcBlockResponseMessage) TypeByte() byte { return msgTypeBlockResponse }
  259. func (m bcBlockResponseMessage) String() string {
  260. return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
  261. }
  262. //-------------------------------------
  263. type bcPeerStatusMessage struct {
  264. Height uint
  265. }
  266. func (m bcPeerStatusMessage) TypeByte() byte { return msgTypePeerStatus }
  267. func (m bcPeerStatusMessage) String() string {
  268. return fmt.Sprintf("[bcPeerStatusMessage %v]", m.Height)
  269. }