You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

305 lines
8.3 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync/atomic"
  8. "time"
  9. "github.com/tendermint/tendermint/binary"
  10. . "github.com/tendermint/tendermint/common"
  11. "github.com/tendermint/tendermint/events"
  12. "github.com/tendermint/tendermint/p2p"
  13. sm "github.com/tendermint/tendermint/state"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. const (
  17. BlockchainChannel = byte(0x40)
  18. defaultChannelCapacity = 100
  19. defaultSleepIntervalMS = 500
  20. trySyncIntervalMS = 100
  21. // stop syncing when last block's time is
  22. // within this much of the system time.
  23. stopSyncingDurationMinutes = 10
  24. )
  25. type stateResetter interface {
  26. ResetToState(*sm.State)
  27. }
  28. // BlockchainReactor handles long-term catchup syncing.
  29. type BlockchainReactor struct {
  30. sw *p2p.Switch
  31. state *sm.State
  32. store *BlockStore
  33. pool *BlockPool
  34. sync bool
  35. requestsCh chan BlockRequest
  36. timeoutsCh chan string
  37. lastBlock *types.Block
  38. quit chan struct{}
  39. running uint32
  40. evsw events.Fireable
  41. }
  42. func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor {
  43. if state.LastBlockHeight != store.Height() &&
  44. state.LastBlockHeight != store.Height()-1 { // XXX double check this logic.
  45. panic(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  46. }
  47. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  48. timeoutsCh := make(chan string, defaultChannelCapacity)
  49. pool := NewBlockPool(
  50. store.Height()+1,
  51. requestsCh,
  52. timeoutsCh,
  53. )
  54. bcR := &BlockchainReactor{
  55. state: state,
  56. store: store,
  57. pool: pool,
  58. sync: sync,
  59. requestsCh: requestsCh,
  60. timeoutsCh: timeoutsCh,
  61. quit: make(chan struct{}),
  62. running: uint32(0),
  63. }
  64. return bcR
  65. }
  66. // Implements Reactor
  67. func (bcR *BlockchainReactor) Start(sw *p2p.Switch) {
  68. if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) {
  69. log.Info("Starting BlockchainReactor")
  70. bcR.sw = sw
  71. bcR.pool.Start()
  72. if bcR.sync {
  73. go bcR.poolRoutine()
  74. }
  75. }
  76. }
  77. // Implements Reactor
  78. func (bcR *BlockchainReactor) Stop() {
  79. if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) {
  80. log.Info("Stopping BlockchainReactor")
  81. close(bcR.quit)
  82. bcR.pool.Stop()
  83. }
  84. }
  85. // Implements Reactor
  86. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  87. return []*p2p.ChannelDescriptor{
  88. &p2p.ChannelDescriptor{
  89. Id: BlockchainChannel,
  90. Priority: 5,
  91. SendQueueCapacity: 100,
  92. },
  93. }
  94. }
  95. // Implements Reactor
  96. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
  97. // Send peer our state.
  98. peer.Send(BlockchainChannel, &bcPeerStatusMessage{bcR.store.Height()})
  99. }
  100. // Implements Reactor
  101. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  102. // Remove peer from the pool.
  103. bcR.pool.RemovePeer(peer.Key)
  104. }
  105. // Implements Reactor
  106. func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
  107. _, msg, err := DecodeMessage(msgBytes)
  108. if err != nil {
  109. log.Warn("Error decoding message", "error", err)
  110. return
  111. }
  112. log.Info("Received message", "msg", msg)
  113. switch msg := msg.(type) {
  114. case *bcBlockRequestMessage:
  115. // Got a request for a block. Respond with block if we have it.
  116. block := bcR.store.LoadBlock(msg.Height)
  117. if block != nil {
  118. msg := &bcBlockResponseMessage{Block: block}
  119. queued := src.TrySend(BlockchainChannel, msg)
  120. if !queued {
  121. // queue is full, just ignore.
  122. }
  123. } else {
  124. // TODO peer is asking for things we don't have.
  125. }
  126. case *bcBlockResponseMessage:
  127. // Got a block.
  128. bcR.pool.AddBlock(msg.Block, src.Key)
  129. case *bcPeerStatusMessage:
  130. // Got a peer status.
  131. bcR.pool.SetPeerHeight(src.Key, msg.Height)
  132. default:
  133. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  134. }
  135. }
  136. // Handle messages from the poolReactor telling the reactor what to do.
  137. func (bcR *BlockchainReactor) poolRoutine() {
  138. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  139. FOR_LOOP:
  140. for {
  141. select {
  142. case request := <-bcR.requestsCh: // chan BlockRequest
  143. peer := bcR.sw.Peers().Get(request.PeerId)
  144. if peer == nil {
  145. // We can't fulfill the request.
  146. continue FOR_LOOP
  147. }
  148. msg := &bcBlockRequestMessage{request.Height}
  149. queued := peer.TrySend(BlockchainChannel, msg)
  150. if !queued {
  151. // We couldn't queue the request.
  152. time.Sleep(defaultSleepIntervalMS * time.Millisecond)
  153. continue FOR_LOOP
  154. }
  155. case peerId := <-bcR.timeoutsCh: // chan string
  156. // Peer timed out.
  157. peer := bcR.sw.Peers().Get(peerId)
  158. if peer != nil {
  159. bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  160. }
  161. case _ = <-trySyncTicker.C: // chan time
  162. //var lastValidatedBlock *types.Block
  163. SYNC_LOOP:
  164. for i := 0; i < 10; i++ {
  165. // See if there are any blocks to sync.
  166. first, second := bcR.pool.PeekTwoBlocks()
  167. //log.Debug("TrySync peeked", "first", first, "second", second)
  168. if first == nil || second == nil {
  169. // We need both to sync the first block.
  170. break SYNC_LOOP
  171. }
  172. firstParts := first.MakePartSet()
  173. firstPartsHeader := firstParts.Header()
  174. // Finally, verify the first block using the second's validation.
  175. err := bcR.state.BondedValidators.VerifyValidation(
  176. first.Hash(), firstPartsHeader, first.Height, second.Validation)
  177. if err != nil {
  178. log.Debug("error in validation", "error", err)
  179. bcR.pool.RedoRequest(first.Height)
  180. break SYNC_LOOP
  181. } else {
  182. bcR.pool.PopRequest()
  183. err := sm.ExecBlock(bcR.state, first, firstPartsHeader)
  184. if err != nil {
  185. // TODO This is bad, are we zombie?
  186. panic(Fmt("Failed to process committed block: %v", err))
  187. }
  188. bcR.store.SaveBlock(first, firstParts, second.Validation)
  189. bcR.state.Save()
  190. //lastValidatedBlock = first
  191. }
  192. }
  193. /*
  194. // We're done syncing for now (will do again shortly)
  195. // See if we want to stop syncing and turn on the
  196. // consensus reactor.
  197. // TODO: use other heuristics too besides blocktime.
  198. // It's not a security concern, as it only needs to happen
  199. // upon node sync, and there's also a second (slower)
  200. // method of syncing in the consensus reactor.
  201. if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute {
  202. go func() {
  203. log.Info("Stopping blockpool syncing, turning on consensus...")
  204. trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others.
  205. conR := bcR.sw.Reactor("CONSENSUS")
  206. conR.(stateResetter).ResetToState(bcR.state)
  207. conR.Start(bcR.sw)
  208. for _, peer := range bcR.sw.Peers().List() {
  209. conR.AddPeer(peer)
  210. }
  211. }()
  212. break FOR_LOOP
  213. }
  214. */
  215. continue FOR_LOOP
  216. case <-bcR.quit:
  217. break FOR_LOOP
  218. }
  219. }
  220. }
  221. func (bcR *BlockchainReactor) BroadcastStatus() error {
  222. bcR.sw.Broadcast(BlockchainChannel, &bcPeerStatusMessage{bcR.store.Height()})
  223. return nil
  224. }
  225. // implements events.Eventable
  226. func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) {
  227. bcR.evsw = evsw
  228. }
  229. //-----------------------------------------------------------------------------
  230. // Messages
  231. const (
  232. msgTypeBlockRequest = byte(0x10)
  233. msgTypeBlockResponse = byte(0x11)
  234. msgTypePeerStatus = byte(0x20)
  235. )
  236. type BlockchainMessage interface{}
  237. var _ = binary.RegisterInterface(
  238. struct{ BlockchainMessage }{},
  239. binary.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
  240. binary.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
  241. binary.ConcreteType{&bcPeerStatusMessage{}, msgTypePeerStatus},
  242. )
  243. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
  244. msgType = bz[0]
  245. n := new(int64)
  246. r := bytes.NewReader(bz)
  247. msg = binary.ReadBinary(struct{ BlockchainMessage }{}, r, n, &err).(struct{ BlockchainMessage }).BlockchainMessage
  248. return
  249. }
  250. //-------------------------------------
  251. type bcBlockRequestMessage struct {
  252. Height uint
  253. }
  254. func (m *bcBlockRequestMessage) String() string {
  255. return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
  256. }
  257. //-------------------------------------
  258. type bcBlockResponseMessage struct {
  259. Block *types.Block
  260. }
  261. func (m *bcBlockResponseMessage) String() string {
  262. return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
  263. }
  264. //-------------------------------------
  265. type bcPeerStatusMessage struct {
  266. Height uint
  267. }
  268. func (m *bcPeerStatusMessage) String() string {
  269. return fmt.Sprintf("[bcPeerStatusMessage %v]", m.Height)
  270. }