You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

304 lines
8.1 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "sync/atomic"
  7. "time"
  8. "github.com/tendermint/tendermint/binary"
  9. . "github.com/tendermint/tendermint/common"
  10. "github.com/tendermint/tendermint/events"
  11. "github.com/tendermint/tendermint/p2p"
  12. sm "github.com/tendermint/tendermint/state"
  13. "github.com/tendermint/tendermint/types"
  14. )
  15. const (
  16. BlockchainChannel = byte(0x40)
  17. defaultChannelCapacity = 100
  18. defaultSleepIntervalMS = 500
  19. trySyncIntervalMS = 100
  20. // stop syncing when last block's time is
  21. // within this much of the system time.
  22. stopSyncingDurationMinutes = 10
  23. )
  24. type stateResetter interface {
  25. ResetToState(*sm.State)
  26. }
  27. // BlockchainReactor handles long-term catchup syncing.
  28. type BlockchainReactor struct {
  29. sw *p2p.Switch
  30. state *sm.State
  31. store *BlockStore
  32. pool *BlockPool
  33. sync bool
  34. requestsCh chan BlockRequest
  35. timeoutsCh chan string
  36. lastBlock *types.Block
  37. quit chan struct{}
  38. running uint32
  39. evsw *events.EventSwitch
  40. }
  41. func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor {
  42. if state.LastBlockHeight != store.Height() &&
  43. state.LastBlockHeight != store.Height()-1 { // XXX double check this logic.
  44. panic(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  45. }
  46. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  47. timeoutsCh := make(chan string, defaultChannelCapacity)
  48. pool := NewBlockPool(
  49. store.Height()+1,
  50. requestsCh,
  51. timeoutsCh,
  52. )
  53. bcR := &BlockchainReactor{
  54. state: state,
  55. store: store,
  56. pool: pool,
  57. sync: sync,
  58. requestsCh: requestsCh,
  59. timeoutsCh: timeoutsCh,
  60. quit: make(chan struct{}),
  61. running: uint32(0),
  62. }
  63. return bcR
  64. }
  65. // Implements Reactor
  66. func (bcR *BlockchainReactor) Start(sw *p2p.Switch) {
  67. if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) {
  68. log.Info("Starting BlockchainReactor")
  69. bcR.sw = sw
  70. bcR.pool.Start()
  71. if bcR.sync {
  72. go bcR.poolRoutine()
  73. }
  74. }
  75. }
  76. // Implements Reactor
  77. func (bcR *BlockchainReactor) Stop() {
  78. if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) {
  79. log.Info("Stopping BlockchainReactor")
  80. close(bcR.quit)
  81. bcR.pool.Stop()
  82. }
  83. }
  84. // Implements Reactor
  85. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  86. return []*p2p.ChannelDescriptor{
  87. &p2p.ChannelDescriptor{
  88. Id: BlockchainChannel,
  89. Priority: 5,
  90. SendQueueCapacity: 100,
  91. },
  92. }
  93. }
  94. // Implements Reactor
  95. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
  96. // Send peer our state.
  97. peer.Send(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()})
  98. }
  99. // Implements Reactor
  100. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  101. // Remove peer from the pool.
  102. bcR.pool.RemovePeer(peer.Key)
  103. }
  104. // Implements Reactor
  105. func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
  106. _, msg, err := DecodeMessage(msgBytes)
  107. if err != nil {
  108. log.Warn("Error decoding message", "error", err)
  109. return
  110. }
  111. log.Info("Received message", "msg", msg)
  112. switch msg := msg.(type) {
  113. case bcBlockRequestMessage:
  114. // Got a request for a block. Respond with block if we have it.
  115. block := bcR.store.LoadBlock(msg.Height)
  116. if block != nil {
  117. msg := bcBlockResponseMessage{Block: block}
  118. queued := src.TrySend(BlockchainChannel, msg)
  119. if !queued {
  120. // queue is full, just ignore.
  121. }
  122. } else {
  123. // TODO peer is asking for things we don't have.
  124. }
  125. case bcBlockResponseMessage:
  126. // Got a block.
  127. bcR.pool.AddBlock(msg.Block, src.Key)
  128. case bcPeerStatusMessage:
  129. // Got a peer status.
  130. bcR.pool.SetPeerHeight(src.Key, msg.Height)
  131. default:
  132. // Ignore unknown message
  133. }
  134. }
  135. // Handle messages from the poolReactor telling the reactor what to do.
  136. func (bcR *BlockchainReactor) poolRoutine() {
  137. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  138. FOR_LOOP:
  139. for {
  140. select {
  141. case request := <-bcR.requestsCh: // chan BlockRequest
  142. peer := bcR.sw.Peers().Get(request.PeerId)
  143. if peer == nil {
  144. // We can't fulfill the request.
  145. continue FOR_LOOP
  146. }
  147. msg := bcBlockRequestMessage{request.Height}
  148. queued := peer.TrySend(BlockchainChannel, msg)
  149. if !queued {
  150. // We couldn't queue the request.
  151. time.Sleep(defaultSleepIntervalMS * time.Millisecond)
  152. continue FOR_LOOP
  153. }
  154. case peerId := <-bcR.timeoutsCh: // chan string
  155. // Peer timed out.
  156. peer := bcR.sw.Peers().Get(peerId)
  157. if peer != nil {
  158. bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  159. }
  160. case _ = <-trySyncTicker.C: // chan time
  161. //var lastValidatedBlock *types.Block
  162. SYNC_LOOP:
  163. for i := 0; i < 10; i++ {
  164. // See if there are any blocks to sync.
  165. first, second := bcR.pool.PeekTwoBlocks()
  166. //log.Debug("TrySync peeked", "first", first, "second", second)
  167. if first == nil || second == nil {
  168. // We need both to sync the first block.
  169. break SYNC_LOOP
  170. }
  171. firstParts := first.MakePartSet()
  172. firstPartsHeader := firstParts.Header()
  173. // Finally, verify the first block using the second's validation.
  174. err := bcR.state.BondedValidators.VerifyValidation(
  175. first.Hash(), firstPartsHeader, first.Height, second.Validation)
  176. if err != nil {
  177. log.Debug("error in validation", "error", err)
  178. bcR.pool.RedoRequest(first.Height)
  179. break SYNC_LOOP
  180. } else {
  181. bcR.pool.PopRequest()
  182. err := sm.ExecBlock(bcR.state, first, firstPartsHeader)
  183. if err != nil {
  184. // TODO This is bad, are we zombie?
  185. panic(Fmt("Failed to process committed block: %v", err))
  186. }
  187. bcR.store.SaveBlock(first, firstParts, second.Validation)
  188. bcR.state.Save()
  189. //lastValidatedBlock = first
  190. }
  191. }
  192. /*
  193. // We're done syncing for now (will do again shortly)
  194. // See if we want to stop syncing and turn on the
  195. // consensus reactor.
  196. // TODO: use other heuristics too besides blocktime.
  197. // It's not a security concern, as it only needs to happen
  198. // upon node sync, and there's also a second (slower)
  199. // method of syncing in the consensus reactor.
  200. if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute {
  201. go func() {
  202. log.Info("Stopping blockpool syncing, turning on consensus...")
  203. trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others.
  204. conR := bcR.sw.Reactor("CONSENSUS")
  205. conR.(stateResetter).ResetToState(bcR.state)
  206. conR.Start(bcR.sw)
  207. for _, peer := range bcR.sw.Peers().List() {
  208. conR.AddPeer(peer)
  209. }
  210. }()
  211. break FOR_LOOP
  212. }
  213. */
  214. continue FOR_LOOP
  215. case <-bcR.quit:
  216. break FOR_LOOP
  217. }
  218. }
  219. }
  220. func (bcR *BlockchainReactor) BroadcastStatus() error {
  221. bcR.sw.Broadcast(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()})
  222. return nil
  223. }
  224. // implements events.Eventable
  225. func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) {
  226. bcR.evsw = evsw
  227. }
  228. //-----------------------------------------------------------------------------
  229. // Messages
  230. const (
  231. msgTypeBlockRequest = byte(0x10)
  232. msgTypeBlockResponse = byte(0x11)
  233. msgTypePeerStatus = byte(0x20)
  234. )
  235. type BlockchainMessage interface{}
  236. var _ = binary.RegisterInterface(
  237. struct{ BlockchainMessage }{},
  238. binary.ConcreteType{bcBlockRequestMessage{}, msgTypeBlockRequest},
  239. binary.ConcreteType{bcBlockResponseMessage{}, msgTypeBlockResponse},
  240. binary.ConcreteType{bcPeerStatusMessage{}, msgTypePeerStatus},
  241. )
  242. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
  243. msgType = bz[0]
  244. n := new(int64)
  245. r := bytes.NewReader(bz)
  246. msg = binary.ReadBinary(&msg, r, n, &err)
  247. return
  248. }
  249. //-------------------------------------
  250. type bcBlockRequestMessage struct {
  251. Height uint
  252. }
  253. func (m bcBlockRequestMessage) String() string {
  254. return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
  255. }
  256. //-------------------------------------
  257. type bcBlockResponseMessage struct {
  258. Block *types.Block
  259. }
  260. func (m bcBlockResponseMessage) String() string {
  261. return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
  262. }
  263. //-------------------------------------
  264. type bcPeerStatusMessage struct {
  265. Height uint
  266. }
  267. func (m bcPeerStatusMessage) String() string {
  268. return fmt.Sprintf("[bcPeerStatusMessage %v]", m.Height)
  269. }