You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

660 lines
17 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package blocks
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. . "github.com/tendermint/tendermint/binary"
  11. . "github.com/tendermint/tendermint/common"
  12. db_ "github.com/tendermint/tendermint/db"
  13. "github.com/tendermint/tendermint/p2p"
  14. )
  15. var dbKeyState = []byte("state")
  16. const (
  17. blocksInfoCh = byte(0x10) // For requests & cancellations
  18. blocksDataCh = byte(0x11) // For data
  19. msgTypeUnknown = Byte(0x00)
  20. msgTypeState = Byte(0x01)
  21. msgTypeRequest = Byte(0x02)
  22. msgTypeData = Byte(0x03)
  23. maxRequestsPerPeer = 2 // Maximum number of outstanding requests from peer.
  24. maxRequestsPerData = 2 // Maximum number of outstanding requests of some data.
  25. maxRequestAheadBlock = 5 // Maximum number of blocks to request ahead of current verified. Must be >= 1
  26. defaultRequestTimeoutS =
  27. timeoutRepeatTimerMS = 1000 // Handle timed out requests periodically
  28. )
  29. /*
  30. TODO: keep a heap of dataRequests * their corresponding timeouts.
  31. timeout dataRequests and update the peerState,
  32. TODO: need to keep track of progress, blocks are too large. or we need to chop into chunks.
  33. TODO: need to validate blocks. :/
  34. TODO: actually save the block.
  35. */
  36. //-----------------------------------------------------------------------------
  37. const (
  38. dataTypeBlock = byte(0x00)
  39. // TODO: allow for more types, such as specific transactions
  40. )
  41. type dataKey struct {
  42. dataType byte
  43. height uint64
  44. }
  45. func newDataKey(dataType byte, height uint64) dataKey {
  46. return dataKey{dataType, height}
  47. }
  48. func readDataKey(r io.Reader) dataKey {
  49. return dataKey{
  50. dataType: ReadByte(r),
  51. height: ReadUInt64(r),
  52. }
  53. }
  54. func (dk dataKey) WriteTo(w io.Writer) (n int64, err error) {
  55. n, err = WriteTo(dk.dataType, w, n, err)
  56. n, err = WriteTo(dk.height, w, n, err)
  57. return
  58. }
  59. func (dk dataKey) String() string {
  60. switch dataType {
  61. case dataTypeBlock:
  62. return dataKeyfmt.Sprintf("B%v", height)
  63. default:
  64. Panicf("Unknown datatype %X", dataType)
  65. return "" // should not happen
  66. }
  67. }
  68. //-----------------------------------------------------------------------------
  69. type BlockManager struct {
  70. db *db_.LevelDB
  71. sw *p2p.Switch
  72. swEvents chan interface{}
  73. state *blockManagerState
  74. timeoutTimer *RepeatTimer
  75. quit chan struct{}
  76. started uint32
  77. stopped uint32
  78. }
  79. func NewBlockManager(sw *p2p.Switch, db *db_.LevelDB) *BlockManager {
  80. swEvents := make(chan interface{})
  81. sw.AddEventListener("BlockManager.swEvents", swEvents)
  82. bm := &BlockManager{
  83. db: db,
  84. sw: sw,
  85. swEvents: swEvents,
  86. state: newBlockManagerState(),
  87. timeoutTimer: NewRepeatTimer(timeoutRepeatTimerMS * time.Second),
  88. quit: make(chan struct{}),
  89. }
  90. bm.loadState()
  91. return bm
  92. }
  93. func (bm *BlockManager) Start() {
  94. if atomic.CompareAndSwapUint32(&bm.started, 0, 1) {
  95. log.Info("Starting BlockManager")
  96. go bm.switchEventsHandler()
  97. go bm.blocksInfoHandler()
  98. go bm.blocksDataHandler()
  99. go bm.requestTimeoutHandler()
  100. }
  101. }
  102. func (bm *BlockManager) Stop() {
  103. if atomic.CompareAndSwapUint32(&bm.stopped, 0, 1) {
  104. log.Info("Stopping BlockManager")
  105. close(bm.quit)
  106. close(bm.swEvents)
  107. }
  108. }
  109. // NOTE: assumes that data is already validated.
  110. // "request" is optional, it's the request response that supplied
  111. // the data.
  112. func (bm *BlockManager) StoreBlock(block *Block, origin *dataRequest) {
  113. dataKey := newDataKey(dataTypeBlock, uint64(block.Header.Height))
  114. // XXX actually save the block.
  115. canceled, newHeight := bm.state.didGetDataFromPeer(dataKey, origin.peer)
  116. // Notify peers that the request has been canceled.
  117. for _, request := range canceled {
  118. msg := &requestMessage{
  119. key: dataKey,
  120. type_: requestTypeCanceled,
  121. }
  122. tm := p2p.TypedMessage{msgTypeRequest, msg}
  123. request.peer.TrySend(blocksInfoCh, tm.Bytes())
  124. }
  125. // If we have new data that extends our contiguous range, then announce it.
  126. if newHeight {
  127. bm.sw.Broadcast(blocksInfoCh, bm.state.makeStateMessage())
  128. }
  129. }
  130. func (bm *BlockManager) LoadBlock(height uint64) *Block {
  131. panic("not yet implemented")
  132. }
  133. // Handle peer new/done events
  134. func (bm *BlockManager) switchEventsHandler() {
  135. for {
  136. swEvent, ok := <-bm.swEvents
  137. if !ok {
  138. break
  139. }
  140. switch swEvent.(type) {
  141. case p2p.SwitchEventNewPeer:
  142. event := swEvent.(p2p.SwitchEventNewPeer)
  143. // Create peerState for event.Peer
  144. bm.state.createEntryForPeer(event.Peer)
  145. // Share our state with event.Peer
  146. msg := &stateMessage{
  147. lastBlockHeight: UInt64(bm.state.lastBlockHeight),
  148. }
  149. tm := p2p.TypedMessage{msgTypeRequest, msg}
  150. event.Peer.TrySend(blocksInfoCh, tm.Bytes())
  151. case p2p.SwitchEventDonePeer:
  152. event := swEvent.(p2p.SwitchEventDonePeer)
  153. // Delete peerState for event.Peer
  154. bm.state.deleteEntryForPeer(event.Peer)
  155. default:
  156. log.Warning("Unhandled switch event type")
  157. }
  158. }
  159. }
  160. // Handle requests/cancellations from the blocksInfo channel
  161. func (bm *BlockManager) blocksInfoHandler() {
  162. for {
  163. inMsg, ok := bm.sw.Receive(blocksInfoCh)
  164. if !ok {
  165. break // Client has stopped
  166. }
  167. msg := decodeMessage(inMsg.Bytes)
  168. log.Info("blocksInfoHandler received %v", msg)
  169. switch msg.(type) {
  170. case *stateMessage:
  171. m := msg.(*stateMessage)
  172. peerState := bm.getPeerState(inMsg.MConn.Peer)
  173. if peerState == nil {
  174. continue // peer has since been disconnected.
  175. }
  176. newDataTypes := peerState.applyStateMessage(m)
  177. // Consider requesting data.
  178. // Does the peer claim to have something we want?
  179. FOR_LOOP:
  180. for _, newDataType := range newDataTypes {
  181. // Are we already requesting too much data from peer?
  182. if !peerState.canRequestMore() {
  183. break FOR_LOOP
  184. }
  185. for _, wantedKey := range bm.state.nextWantedKeysForType(newDataType) {
  186. if !peerState.hasData(wantedKey) {
  187. break FOR_LOOP
  188. }
  189. // Request wantedKey from peer.
  190. msg := &requestMessage{
  191. key: dataKey,
  192. type_: requestTypeFetch,
  193. }
  194. tm := p2p.TypedMessage{msgTypeRequest, msg}
  195. sent := inMsg.MConn.Peer.TrySend(blocksInfoCh, tm.Bytes())
  196. if sent {
  197. // Log the request
  198. request := &dataRequest{
  199. peer: inMsg.MConn.Peer,
  200. key: wantedKey,
  201. time: time.Now(),
  202. timeout: time.Now().Add(defaultRequestTimeout
  203. }
  204. bm.state.addDataRequest(request)
  205. }
  206. }
  207. }
  208. case *requestMessage:
  209. m := msg.(*requestMessage)
  210. switch m.type_ {
  211. case requestTypeFetch:
  212. // TODO: prevent abuse.
  213. if !inMsg.MConn.Peer.CanSend(blocksDataCh) {
  214. msg := &requestMessage{
  215. key: dataKey,
  216. type_: requestTypeTryAgain,
  217. }
  218. tm := p2p.TypedMessage{msgTypeRequest, msg}
  219. sent := inMsg.MConn.Peer.TrySend(blocksInfoCh, tm.Bytes())
  220. } else {
  221. // If we don't have it, log and ignore.
  222. block := bm.LoadBlock(m.key.height)
  223. if block == nil {
  224. log.Warning("Peer %v asked for nonexistant block %v", inMsg.MConn.Peer, m.key)
  225. }
  226. // Send the data.
  227. msg := &dataMessage{
  228. key: dataKey,
  229. bytes: BinaryBytes(block),
  230. }
  231. tm := p2p.TypedMessage{msgTypeData, msg}
  232. inMsg.MConn.Peer.TrySend(blocksDataCh, tm.Bytes())
  233. }
  234. case requestTypeCanceled:
  235. // TODO: handle
  236. // This requires modifying mconnection to keep track of item keys.
  237. case requestTypeTryAgain:
  238. // TODO: handle
  239. default:
  240. log.Warning("Invalid request: %v", m)
  241. // Ignore.
  242. }
  243. default:
  244. // should not happen
  245. Panicf("Unknown message %v", msg)
  246. // bm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
  247. }
  248. }
  249. // Cleanup
  250. }
  251. // Handle receiving data from the blocksData channel
  252. func (bm *BlockManager) blocksDataHandler() {
  253. for {
  254. inMsg, ok := bm.sw.Receive(blocksDataCh)
  255. if !ok {
  256. break // Client has stopped
  257. }
  258. msg := decodeMessage(inMsg.Bytes)
  259. log.Info("blocksDataHandler received %v", msg)
  260. switch msg.(type) {
  261. case *dataMessage:
  262. // See if we want the data.
  263. // Validate data.
  264. // Add to db.
  265. // Update state & broadcast as necessary.
  266. default:
  267. // Ignore unknown message
  268. // bm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
  269. }
  270. }
  271. // Cleanup
  272. }
  273. // Handle timed out requests by requesting from others.
  274. func (bm *BlockManager) requestTimeoutHandler() {
  275. for {
  276. _, ok := <-bm.timeoutTimer
  277. if !ok {
  278. break
  279. }
  280. // Iterate over requests by time and handle timed out requests.
  281. }
  282. }
  283. //-----------------------------------------------------------------------------
  284. // blockManagerState keeps track of which block parts are stored locally.
  285. // It's also persisted via JSON in the db.
  286. type blockManagerState struct {
  287. mtx sync.Mutex
  288. lastBlockHeight uint64 // Last contiguous header height
  289. otherBlockHeights map[uint64]struct{}
  290. requestsByKey map[dataKey][]*dataRequest
  291. requestsByTimeout *Heap // Could be a linkedlist, but more flexible.
  292. peerStates map[string]*peerState
  293. }
  294. func newBlockManagerState() *blockManagerState {
  295. return &blockManagerState{
  296. requestsByKey: make(map[dataKey][]*dataRequest),
  297. requestsByTimeout: NewHeap(),
  298. peerStates: make(map[string]*peerState),
  299. }
  300. }
  301. type blockManagerStateJSON struct {
  302. LastBlockHeight uint64 // Last contiguous header height
  303. OtherBlockHeights map[uint64]struct{}
  304. }
  305. func (bms *BlockManagerState) loadState(db _db.LevelDB) {
  306. bms.mtx.Lock()
  307. defer bms.mtx.Unlock()
  308. stateBytes := db.Get(dbKeyState)
  309. if stateBytes == nil {
  310. log.Info("New BlockManager with no state")
  311. } else {
  312. bmsJSON := &blockManagerStateJSON{}
  313. err := json.Unmarshal(stateBytes, bmsJSON)
  314. if err != nil {
  315. Panicf("Could not unmarshal state bytes: %X", stateBytes)
  316. }
  317. bms.lastBlockHeight = bmsJSON.LastBlockHeight
  318. bms.otherBlockHeights = bmsJSON.OtherBlockHeights
  319. }
  320. }
  321. func (bms *BlockManagerState) saveState(db _db.LevelDB) {
  322. bms.mtx.Lock()
  323. defer bms.mtx.Unlock()
  324. bmsJSON := &blockManagerStateJSON{
  325. LastBlockHeight: bms.lastBlockHeight,
  326. OtherBlockHeights: bms.otherBlockHeights,
  327. }
  328. stateBytes, err := json.Marshal(bmsJSON)
  329. if err != nil {
  330. panic("Could not marshal state bytes")
  331. }
  332. db.Set(dbKeyState, stateBytes)
  333. }
  334. func (bms *blockManagerState) makeStateMessage() *stateMessage {
  335. bms.mtx.Lock()
  336. defer bms.mtx.Unlock()
  337. return &stateMessage{
  338. lastBlockHeight: UInt64(bms.lastBlockHeight),
  339. }
  340. }
  341. func (bms *blockManagerState) createEntryForPeer(peer *peer) {
  342. bms.mtx.Lock()
  343. defer bms.mtx.Unlock()
  344. bms.peerStates[peer.Key] = &peerState{peer: peer}
  345. }
  346. func (bms *blockManagerState) deleteEntryForPeer(peer *peer) {
  347. bms.mtx.Lock()
  348. defer bms.mtx.Unlock()
  349. delete(bms.peerStates, peer.Key)
  350. }
  351. func (bms *blockManagerState) getPeerState(peer *Peer) {
  352. bms.mtx.Lock()
  353. defer bms.mtx.Unlock()
  354. return bms.peerStates[peer.Key]
  355. }
  356. func (bms *blockManagerState) addDataRequest(newRequest *dataRequest) {
  357. ps.mtx.Lock()
  358. bms.requestsByKey[newRequest.key] = append(bms.requestsByKey[newRequest.key], newRequest)
  359. bms.requestsByTimeout.Push(newRequest) // XXX
  360. peerState, ok := bms.peerStates[newRequest.peer.Key]
  361. ps.mtx.Unlock()
  362. if ok {
  363. peerState.addDataRequest(newRequest)
  364. }
  365. }
  366. func (bms *blockManagerState) didGetDataFromPeer(key dataKey, peer *p2p.Peer) (canceled []*dataRequest, newHeight bool) {
  367. bms.mtx.Lock()
  368. defer bms.mtx.Unlock()
  369. if key.dataType != dataTypeBlock {
  370. Panicf("Unknown datatype %X", key.dataType)
  371. }
  372. // Adjust lastBlockHeight/otherBlockHeights.
  373. height := key.height
  374. if bms.lastBlockHeight == height-1 {
  375. bms.lastBlockHeight = height
  376. height++
  377. for _, ok := bms.otherBlockHeights[height]; ok; {
  378. delete(bms.otherBlockHeights, height)
  379. bms.lastBlockHeight = height
  380. height++
  381. }
  382. newHeight = true
  383. }
  384. // Remove dataRequests
  385. requests := bms.requestsByKey[key]
  386. for _, request := range requests {
  387. peerState, ok := bms.peerStates[peer.Key]
  388. if ok {
  389. peerState.removeDataRequest(request)
  390. }
  391. if request.peer == peer {
  392. continue
  393. }
  394. canceled = append(canceled, request)
  395. }
  396. delete(bms.requestsByKey, key)
  397. return canceled, newHeight
  398. }
  399. // Returns at most maxRequestAheadBlock dataKeys that we don't yet have &
  400. // aren't already requesting from maxRequestsPerData peers.
  401. func (bms *blockManagerState) nextWantedKeysForType(dataType byte) []dataKey {
  402. bms.mtx.Lock()
  403. defer bms.mtx.Unlock()
  404. var keys []dataKey
  405. switch dataType {
  406. case dataTypeBlock:
  407. for h := bms.lastBlockHeight + 1; h <= bms.lastBlockHeight+maxRequestAheadBlock; h++ {
  408. if _, ok := bms.otherBlockHeights[h]; !ok {
  409. key := newDataKey(dataTypeBlock, h)
  410. if len(bms.requestsByKey[key]) < maxRequestsPerData {
  411. keys = append(keys, key)
  412. }
  413. }
  414. }
  415. return keys
  416. default:
  417. Panicf("Unknown datatype %X", dataType)
  418. return
  419. }
  420. }
  421. //-----------------------------------------------------------------------------
  422. // dataRequest keeps track of each request for a given peice of data & peer.
  423. type dataRequest struct {
  424. peer *p2p.Peer
  425. key dataKey
  426. time time.Time
  427. timeout time.Time
  428. }
  429. //-----------------------------------------------------------------------------
  430. type peerState struct {
  431. mtx sync.Mutex
  432. peer *Peer
  433. lastBlockHeight uint64 // Last contiguous header height
  434. requests []*dataRequest // Active requests
  435. // XXX we need to
  436. }
  437. // Returns which dataTypes are new as declared by stateMessage.
  438. func (ps *peerState) applyStateMessage(msg *stateMessage) []byte {
  439. ps.mtx.Lock()
  440. defer ps.mtx.Unlock()
  441. var newTypes []byte
  442. if uint64(msg.lastBlockHeight) > ps.lastBlockHeight {
  443. newTypes = append(newTypes, dataTypeBlock)
  444. ps.lastBlockHeight = uint64(msg.lastBlockHeight)
  445. } else {
  446. log.Info("Strange, peer declares a regression of %X", dataTypeBlock)
  447. }
  448. return newTypes
  449. }
  450. func (ps *peerState) hasData(key dataKey) bool {
  451. ps.mtx.Lock()
  452. defer ps.mtx.Unlock()
  453. switch key.dataType {
  454. case dataTypeBlock:
  455. return key.height <= ps.lastBlockHeight
  456. default:
  457. Panicf("Unknown datatype %X", dataType)
  458. return false // should not happen
  459. }
  460. }
  461. func (ps *peerState) addDataRequest(newRequest *dataRequest) {
  462. ps.mtx.Lock()
  463. defer ps.mtx.Unlock()
  464. for _, request := range ps.requests {
  465. if request.key == newRequest.key {
  466. return
  467. }
  468. }
  469. ps.requests = append(ps.requests, newRequest)
  470. return newRequest
  471. }
  472. func (ps *peerState) remoteDataRequest(key dataKey) bool {
  473. ps.mtx.Lock()
  474. defer ps.mtx.Unlock()
  475. filtered := []*dataRequest{}
  476. removed := false
  477. for _, request := range ps.requests {
  478. if request.key == key {
  479. removed = true
  480. } else {
  481. filtered = append(filtered, request)
  482. }
  483. }
  484. ps.requests = filtered
  485. return removed
  486. }
  487. func (ps *peerState) canRequestMore() bool {
  488. ps.mtx.Lock()
  489. defer ps.mtx.Unlock()
  490. return len(ps.requests) < maxRequestsPerPeer
  491. }
  492. //-----------------------------------------------------------------------------
  493. /* Messages */
  494. // TODO: check for unnecessary extra bytes at the end.
  495. func decodeMessage(bz ByteSlice) (msg interface{}) {
  496. // log.Debug("decoding msg bytes: %X", bz)
  497. switch Byte(bz[0]) {
  498. case msgTypeState:
  499. return &stateMessage{}
  500. case msgTypeRequest:
  501. return readRequestMessage(bytes.NewReader(bz[1:]))
  502. case msgTypeData:
  503. return readDataMessage(bytes.NewReader(bz[1:]))
  504. default:
  505. return nil
  506. }
  507. }
  508. /*
  509. A stateMessage declares what (contiguous) blocks & headers are known.
  510. */
  511. type stateMessage struct {
  512. lastBlockHeight UInt64 // Last contiguous block height
  513. }
  514. func readStateMessage(r io.Reader) *stateMessage {
  515. return &stateMessage{
  516. lastBlockHeight: ReadUInt64(r),
  517. }
  518. }
  519. func (m *stateMessage) WriteTo(w io.Writer) (n int64, err error) {
  520. n, err = WriteTo(msgTypeState, w, n, err)
  521. n, err = WriteTo(m.lastBlockHeight, w, n, err)
  522. return
  523. }
  524. func (m *stateMessage) String() string {
  525. return fmt.Sprintf("[State B:%v]", m.lastBlockHeight)
  526. }
  527. /*
  528. A requestMessage requests a block and/or header at a given height.
  529. */
  530. type requestMessage struct {
  531. key dataKey
  532. type_ Byte
  533. }
  534. const (
  535. requestTypeFetch = Byte(0x01)
  536. requestTypeCanceled = Byte(0x02)
  537. requestTypeTryAgain = Byte(0x03)
  538. )
  539. func readRequestMessage(r io.Reader) *requestMessage {
  540. return &requestMessage{
  541. key: ReadDataKey(r),
  542. type_: ReadByte(r),
  543. }
  544. }
  545. func (m *requestMessage) WriteTo(w io.Writer) (n int64, err error) {
  546. n, err = WriteTo(msgTypeRequest, w, n, err)
  547. n, err = WriteTo(m.key, w, n, err)
  548. n, err = WriteTo(m.type_, w, n, err)
  549. return
  550. }
  551. func (m *requestMessage) String() string {
  552. switch m.type_ {
  553. case requestTypeByte:
  554. return fmt.Sprintf("[Request(fetch) %v]", m.key)
  555. case requestTypeCanceled:
  556. return fmt.Sprintf("[Request(canceled) %v]", m.key)
  557. case requestTypeTryAgain:
  558. return fmt.Sprintf("[Request(tryagain) %v]", m.key)
  559. default:
  560. return fmt.Sprintf("[Request(invalid) %v]", m.key)
  561. }
  562. }
  563. /*
  564. A dataMessage contains block data, maybe requested.
  565. The data can be a Validation, Txs, or whole Block object.
  566. */
  567. type dataMessage struct {
  568. key dataKey
  569. bytes ByteSlice
  570. }
  571. func readDataMessage(r io.Reader) *dataMessage {
  572. return &dataMessage{
  573. key: readDataKey(r),
  574. bytes: readByteSlice(r),
  575. }
  576. }
  577. func (m *dataMessage) WriteTo(w io.Writer) (n int64, err error) {
  578. n, err = WriteTo(msgTypeData, w, n, err)
  579. n, err = WriteTo(m.key, w, n, err)
  580. n, err = WriteTo(m.bytes, w, n, err)
  581. return
  582. }
  583. func (m *dataMessage) String() string {
  584. return fmt.Sprintf("[Data %v]", m.key)
  585. }