You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

393 lines
12 KiB

8 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
8 years ago
8 years ago
7 years ago
7 years ago
7 years ago
8 years ago
10 years ago
7 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
8 years ago
10 years ago
10 years ago
10 years ago
  1. package blockchain
  2. import (
  3. "bytes"
  4. "errors"
  5. "reflect"
  6. "time"
  7. wire "github.com/tendermint/go-wire"
  8. "github.com/tendermint/tendermint/p2p"
  9. "github.com/tendermint/tendermint/proxy"
  10. sm "github.com/tendermint/tendermint/state"
  11. "github.com/tendermint/tendermint/types"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. "github.com/tendermint/tmlibs/log"
  14. )
  15. const (
  16. // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
  17. BlockchainChannel = byte(0x40)
  18. defaultChannelCapacity = 1000
  19. trySyncIntervalMS = 50
  20. // stop syncing when last block's time is
  21. // within this much of the system time.
  22. // stopSyncingDurationMinutes = 10
  23. // ask for best height every 10s
  24. statusUpdateIntervalSeconds = 10
  25. // check if we should switch to consensus reactor
  26. switchToConsensusIntervalSeconds = 1
  27. )
  28. type consensusReactor interface {
  29. // for when we switch from blockchain reactor and fast sync to
  30. // the consensus machine
  31. SwitchToConsensus(*sm.State, int)
  32. }
  33. // BlockchainReactor handles long-term catchup syncing.
  34. type BlockchainReactor struct {
  35. p2p.BaseReactor
  36. state *sm.State
  37. proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn
  38. store *BlockStore
  39. pool *BlockPool
  40. fastSync bool
  41. requestsCh chan BlockRequest
  42. timeoutsCh chan string
  43. eventBus *types.EventBus
  44. }
  45. // NewBlockchainReactor returns new reactor instance.
  46. func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor {
  47. if state.LastBlockHeight != store.Height() {
  48. cmn.PanicSanity(cmn.Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
  49. }
  50. requestsCh := make(chan BlockRequest, defaultChannelCapacity)
  51. timeoutsCh := make(chan string, defaultChannelCapacity)
  52. pool := NewBlockPool(
  53. store.Height()+1,
  54. requestsCh,
  55. timeoutsCh,
  56. )
  57. bcR := &BlockchainReactor{
  58. state: state,
  59. proxyAppConn: proxyAppConn,
  60. store: store,
  61. pool: pool,
  62. fastSync: fastSync,
  63. requestsCh: requestsCh,
  64. timeoutsCh: timeoutsCh,
  65. }
  66. bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
  67. return bcR
  68. }
  69. // SetLogger implements cmn.Service by setting the logger on reactor and pool.
  70. func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
  71. bcR.BaseService.Logger = l
  72. bcR.pool.Logger = l
  73. }
  74. // OnStart implements cmn.Service.
  75. func (bcR *BlockchainReactor) OnStart() error {
  76. if err := bcR.BaseReactor.OnStart(); err != nil {
  77. return err
  78. }
  79. if bcR.fastSync {
  80. err := bcR.pool.Start()
  81. if err != nil {
  82. return err
  83. }
  84. go bcR.poolRoutine()
  85. }
  86. return nil
  87. }
  88. // OnStop implements cmn.Service.
  89. func (bcR *BlockchainReactor) OnStop() {
  90. bcR.BaseReactor.OnStop()
  91. bcR.pool.Stop()
  92. }
  93. // GetChannels implements Reactor
  94. func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
  95. return []*p2p.ChannelDescriptor{
  96. {
  97. ID: BlockchainChannel,
  98. Priority: 10,
  99. SendQueueCapacity: 1000,
  100. },
  101. }
  102. }
  103. // AddPeer implements Reactor by sending our state to peer.
  104. func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
  105. if !peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}}) {
  106. // doing nothing, will try later in `poolRoutine`
  107. }
  108. // peer is added to the pool once we receive the first
  109. // bcStatusResponseMessage from the peer and call pool.SetPeerHeight
  110. }
  111. // RemovePeer implements Reactor by removing peer from the pool.
  112. func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  113. bcR.pool.RemovePeer(peer.Key())
  114. }
  115. // respondToPeer loads a block and sends it to the requesting peer,
  116. // if we have it. Otherwise, we'll respond saying we don't have it.
  117. // According to the Tendermint spec, if all nodes are honest,
  118. // no node should be requesting for a block that's non-existent.
  119. func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage, src p2p.Peer) (queued bool) {
  120. block := bcR.store.LoadBlock(msg.Height)
  121. if block != nil {
  122. msg := &bcBlockResponseMessage{Block: block}
  123. return src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
  124. }
  125. bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
  126. return src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{
  127. &bcNoBlockResponseMessage{Height: msg.Height},
  128. })
  129. }
  130. // Receive implements Reactor by handling 4 types of messages (look below).
  131. func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  132. _, msg, err := DecodeMessage(msgBytes, bcR.maxMsgSize())
  133. if err != nil {
  134. bcR.Logger.Error("Error decoding message", "err", err)
  135. return
  136. }
  137. bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
  138. // TODO: improve logic to satisfy megacheck
  139. switch msg := msg.(type) {
  140. case *bcBlockRequestMessage:
  141. if queued := bcR.respondToPeer(msg, src); !queued {
  142. // Unfortunately not queued since the queue is full.
  143. }
  144. case *bcBlockResponseMessage:
  145. // Got a block.
  146. bcR.pool.AddBlock(src.Key(), msg.Block, len(msgBytes))
  147. case *bcStatusRequestMessage:
  148. // Send peer our state.
  149. queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
  150. if !queued {
  151. // sorry
  152. }
  153. case *bcStatusResponseMessage:
  154. // Got a peer status. Unverified.
  155. bcR.pool.SetPeerHeight(src.Key(), msg.Height)
  156. default:
  157. bcR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  158. }
  159. }
  160. // maxMsgSize returns the maximum allowable size of a
  161. // message on the blockchain reactor.
  162. func (bcR *BlockchainReactor) maxMsgSize() int {
  163. return bcR.state.ConsensusParams.BlockSize.MaxBytes + 2
  164. }
  165. // Handle messages from the poolReactor telling the reactor what to do.
  166. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
  167. // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
  168. func (bcR *BlockchainReactor) poolRoutine() {
  169. trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
  170. statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
  171. switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
  172. blocksSynced := 0
  173. chainID := bcR.state.ChainID
  174. lastHundred := time.Now()
  175. lastRate := 0.0
  176. FOR_LOOP:
  177. for {
  178. select {
  179. case request := <-bcR.requestsCh: // chan BlockRequest
  180. peer := bcR.Switch.Peers().Get(request.PeerID)
  181. if peer == nil {
  182. continue FOR_LOOP // Peer has since been disconnected.
  183. }
  184. msg := &bcBlockRequestMessage{request.Height}
  185. queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
  186. if !queued {
  187. // We couldn't make the request, send-queue full.
  188. // The pool handles timeouts, just let it go.
  189. continue FOR_LOOP
  190. }
  191. case peerID := <-bcR.timeoutsCh: // chan string
  192. // Peer timed out.
  193. peer := bcR.Switch.Peers().Get(peerID)
  194. if peer != nil {
  195. bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
  196. }
  197. case <-statusUpdateTicker.C:
  198. // ask for status updates
  199. go bcR.BroadcastStatusRequest() // nolint: errcheck
  200. case <-switchToConsensusTicker.C:
  201. height, numPending, lenRequesters := bcR.pool.GetStatus()
  202. outbound, inbound, _ := bcR.Switch.NumPeers()
  203. bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
  204. "outbound", outbound, "inbound", inbound)
  205. if bcR.pool.IsCaughtUp() {
  206. bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
  207. bcR.pool.Stop()
  208. conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
  209. conR.SwitchToConsensus(bcR.state, blocksSynced)
  210. break FOR_LOOP
  211. }
  212. case <-trySyncTicker.C: // chan time
  213. // This loop can be slow as long as it's doing syncing work.
  214. SYNC_LOOP:
  215. for i := 0; i < 10; i++ {
  216. // See if there are any blocks to sync.
  217. first, second := bcR.pool.PeekTwoBlocks()
  218. //bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
  219. if first == nil || second == nil {
  220. // We need both to sync the first block.
  221. break SYNC_LOOP
  222. }
  223. firstParts := first.MakePartSet(bcR.state.ConsensusParams.BlockPartSizeBytes)
  224. firstPartsHeader := firstParts.Header()
  225. // Finally, verify the first block using the second's commit
  226. // NOTE: we can probably make this more efficient, but note that calling
  227. // first.Hash() doesn't verify the tx contents, so MakePartSet() is
  228. // currently necessary.
  229. err := bcR.state.Validators.VerifyCommit(
  230. chainID, types.BlockID{first.Hash(), firstPartsHeader}, first.Height, second.LastCommit)
  231. if err != nil {
  232. bcR.Logger.Error("Error in validation", "err", err)
  233. bcR.pool.RedoRequest(first.Height)
  234. break SYNC_LOOP
  235. } else {
  236. bcR.pool.PopRequest()
  237. bcR.store.SaveBlock(first, firstParts, second.LastCommit)
  238. // TODO: should we be firing events? need to fire NewBlock events manually ...
  239. // NOTE: we could improve performance if we
  240. // didn't make the app commit to disk every block
  241. // ... but we would need a way to get the hash without it persisting
  242. err := bcR.state.ApplyBlock(bcR.eventBus, bcR.proxyAppConn,
  243. first, firstPartsHeader,
  244. types.MockMempool{}, types.MockEvidencePool{}) // TODO unmock!
  245. if err != nil {
  246. // TODO This is bad, are we zombie?
  247. cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
  248. }
  249. blocksSynced += 1
  250. if blocksSynced%100 == 0 {
  251. lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
  252. bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height,
  253. "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
  254. lastHundred = time.Now()
  255. }
  256. }
  257. }
  258. continue FOR_LOOP
  259. case <-bcR.Quit:
  260. break FOR_LOOP
  261. }
  262. }
  263. }
  264. // BroadcastStatusRequest broadcasts `BlockStore` height.
  265. func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
  266. bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.store.Height()}})
  267. return nil
  268. }
  269. // SetEventBus sets event bus.
  270. func (bcR *BlockchainReactor) SetEventBus(b *types.EventBus) {
  271. bcR.eventBus = b
  272. }
  273. //-----------------------------------------------------------------------------
  274. // Messages
  275. const (
  276. msgTypeBlockRequest = byte(0x10)
  277. msgTypeBlockResponse = byte(0x11)
  278. msgTypeNoBlockResponse = byte(0x12)
  279. msgTypeStatusResponse = byte(0x20)
  280. msgTypeStatusRequest = byte(0x21)
  281. )
  282. // BlockchainMessage is a generic message for this reactor.
  283. type BlockchainMessage interface{}
  284. var _ = wire.RegisterInterface(
  285. struct{ BlockchainMessage }{},
  286. wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
  287. wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
  288. wire.ConcreteType{&bcNoBlockResponseMessage{}, msgTypeNoBlockResponse},
  289. wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
  290. wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
  291. )
  292. // DecodeMessage decodes BlockchainMessage.
  293. // TODO: ensure that bz is completely read.
  294. func DecodeMessage(bz []byte, maxSize int) (msgType byte, msg BlockchainMessage, err error) {
  295. msgType = bz[0]
  296. n := int(0)
  297. r := bytes.NewReader(bz)
  298. msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
  299. if err != nil && n != len(bz) {
  300. err = errors.New("DecodeMessage() had bytes left over")
  301. }
  302. return
  303. }
  304. //-------------------------------------
  305. type bcBlockRequestMessage struct {
  306. Height int64
  307. }
  308. func (m *bcBlockRequestMessage) String() string {
  309. return cmn.Fmt("[bcBlockRequestMessage %v]", m.Height)
  310. }
  311. type bcNoBlockResponseMessage struct {
  312. Height int64
  313. }
  314. func (brm *bcNoBlockResponseMessage) String() string {
  315. return cmn.Fmt("[bcNoBlockResponseMessage %d]", brm.Height)
  316. }
  317. //-------------------------------------
  318. // NOTE: keep up-to-date with maxBlockchainResponseSize
  319. type bcBlockResponseMessage struct {
  320. Block *types.Block
  321. }
  322. func (m *bcBlockResponseMessage) String() string {
  323. return cmn.Fmt("[bcBlockResponseMessage %v]", m.Block.Height)
  324. }
  325. //-------------------------------------
  326. type bcStatusRequestMessage struct {
  327. Height int64
  328. }
  329. func (m *bcStatusRequestMessage) String() string {
  330. return cmn.Fmt("[bcStatusRequestMessage %v]", m.Height)
  331. }
  332. //-------------------------------------
  333. type bcStatusResponseMessage struct {
  334. Height int64
  335. }
  336. func (m *bcStatusResponseMessage) String() string {
  337. return cmn.Fmt("[bcStatusResponseMessage %v]", m.Height)
  338. }