You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

433 lines
12 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package blocks
  2. import (
  3. db_ "github.com/tendermint/tendermint/db"
  4. "github.com/tendermint/tendermint/p2p"
  5. )
  6. const (
  7. blocksCh = "block"
  8. msgTypeUnknown = Byte(0x00)
  9. msgTypeState = Byte(0x01)
  10. msgTypeRequest = Byte(0x02)
  11. msgTypeData = Byte(0x03)
  12. dbKeyState = "state"
  13. )
  14. //-----------------------------------------------------------------------------
  15. // We request each item separately.
  16. const (
  17. dataTypeHeader = byte(0x01)
  18. dataTypeValidation = byte(0x02)
  19. dataTypeTxs = byte(0x03)
  20. )
  21. func _dataKey(dataType byte, height int) {
  22. switch dataType {
  23. case dataTypeHeader:
  24. return fmt.Sprintf("H%v", height)
  25. case dataTypeValidation:
  26. return fmt.Sprintf("V%v", height)
  27. case dataTypeTxs:
  28. return fmt.Sprintf("T%v", height)
  29. default:
  30. panic("Unknown datatype %X", dataType)
  31. }
  32. }
  33. func dataTypeFromObj(data interface{}) {
  34. switch data.(type) {
  35. case *Header:
  36. return dataTypeHeader
  37. case *Validation:
  38. return dataTypeValidation
  39. case *Txs:
  40. return dataTypeTxs
  41. default:
  42. panic("Unexpected datatype: %v", data)
  43. }
  44. }
  45. //-----------------------------------------------------------------------------
  46. // TODO: document
  47. type BlockManager struct {
  48. db *db_.LevelDB
  49. sw *p2p.Switch
  50. swEvents chan interface{}
  51. state blockManagerState
  52. dataStates map[string]*dataState // TODO: replace with CMap
  53. peerStates map[string]*peerState // TODO: replace with CMap
  54. quit chan struct{}
  55. started uint32
  56. stopped uint32
  57. }
  58. func NewBlockManager(sw *p2p.Switch, db *db_.LevelDB) *BlockManager {
  59. swEvents := make(chan interface{})
  60. sw.AddEventListener("BlockManager.swEvents", swEvents)
  61. bm := &BlockManager{
  62. db: db,
  63. sw: sw,
  64. swEvents: swEvents,
  65. dataStates: make(map[string]*dataState),
  66. peerStates: make(map[string]*peerState),
  67. quit: make(chan struct{}),
  68. }
  69. bm.loadState()
  70. return bm
  71. }
  72. func (bm *BlockManager) Start() {
  73. if atomic.CompareAndSwapUint32(&bm.started, 0, 1) {
  74. log.Info("Starting BlockManager")
  75. go bm.switchEventsHandler()
  76. }
  77. }
  78. func (bm *BlockManager) Stop() {
  79. if atomic.CompareAndSwapUint32(&bm.stopped, 0, 1) {
  80. log.Info("Stopping BlockManager")
  81. close(bm.quit)
  82. close(bm.swEvents)
  83. }
  84. }
  85. // NOTE: assumes that data is already validated.
  86. func (bm *BlockManager) StoreData(dataObj interface{}) {
  87. bm.mtx.Lock()
  88. defer bm.mtx.Unlock()
  89. dataType := dataTypeForObj(dataObj)
  90. dataKey := _dataKey(dataType, dataObj)
  91. // Update state
  92. // TODO
  93. // Remove dataState entry, we'll no longer request this.
  94. _dataState := bm.dataStates[dataKey]
  95. removedRequests := _dataState.removeRequestsForDataType(dataType)
  96. for _, request := range removedRequests {
  97. // TODO in future, notify peer that the request has been canceled.
  98. // No point doing this yet, requests in blocksCh are handled singlethreaded.
  99. }
  100. // What are we doing here?
  101. _peerState := bm.peerstates[dataKey]
  102. // If we have new data that extends our contiguous range, then announce it.
  103. }
  104. func (bm *BlockManager) LoadData(dataType byte, height int) interface{} {
  105. panic("not yet implemented")
  106. }
  107. func (bm *BlockManager) loadState() {
  108. // Load the state
  109. stateBytes := bm.db.Get(dbKeyState)
  110. if stateBytes == nil {
  111. log.Info("New BlockManager with no state")
  112. } else {
  113. err := json.Unmarshal(stateBytes, &bm.state)
  114. if err != nil {
  115. panic("Could not unmarshal state bytes: %X", stateBytes)
  116. }
  117. }
  118. }
  119. func (bm *BlockManager) saveState() {
  120. stateBytes, err := json.Marshal(&bm.state)
  121. if err != nil {
  122. panic("Could not marshal state bytes")
  123. }
  124. bm.db.Set(dbKeyState, stateBytes)
  125. }
  126. // Handle peer new/done events
  127. func (bm *BlockManager) switchEventsHandler() {
  128. for {
  129. swEvent, ok := <-bm.swEvents
  130. if !ok {
  131. break
  132. }
  133. switch swEvent.(type) {
  134. case p2p.SwitchEventNewPeer:
  135. event := swEvent.(p2p.SwitchEventNewPeer)
  136. // Create entry in .peerStates
  137. bm.peerStates[event.Peer.RemoteAddress().String()] = &peerState{}
  138. // Share our state with event.Peer
  139. msg := &stateMessage{
  140. lastHeaderHeight: bm.state.lastHeaderHeight,
  141. lastValidationHeight: bm.state.lastValidationHeight,
  142. lastTxsHeight: bm.state.lastTxsHeight,
  143. }
  144. tm := p2p.TypedMessage{msgTypeRequest, msg}
  145. event.Peer.TrySend(NewPacket(blocksCh, tm))
  146. case p2p.SwitchEventDonePeer:
  147. // Remove entry from .peerStates
  148. delete(bm.peerStates, event.Peer.RemoteAddress().String())
  149. default:
  150. log.Warning("Unhandled switch event type")
  151. }
  152. }
  153. }
  154. // Handle requests from the blocks channel
  155. func (bm *BlockManager) requestsHandler() {
  156. for {
  157. inPkt := bm.sw.Receive(blocksCh) // {Peer, Time, Packet}
  158. if inPkt == nil {
  159. // Client has stopped
  160. break
  161. }
  162. // decode message
  163. msg := decodeMessage(inPkt.Bytes)
  164. log.Info("requestHandler received %v", msg)
  165. switch msg.(type) {
  166. case *stateMessage:
  167. m := msg.(*stateMessage)
  168. peerState := bm.peerStates[inPkt.Peer.RemoteAddress.String()]
  169. if peerState == nil {
  170. continue // peer has since been disconnected.
  171. }
  172. peerState.applyStateMessage(m)
  173. // Consider requesting data.
  174. // 1. if has more validation and we want it
  175. // 2. if has more txs and we want it
  176. // if peerState.estimatedCredit() >= averageBlock
  177. // TODO: keep track of what we've requested from peer.
  178. // TODO: keep track of from which peers we've requested data.
  179. // TODO: be fair.
  180. case *requestMessage:
  181. // TODO: prevent abuse.
  182. case *dataMessage:
  183. // See if we want the data.
  184. // Validate data.
  185. // Add to db.
  186. // Update state & broadcast as necessary.
  187. default:
  188. // Ignore unknown message
  189. // bm.sw.StopPeerForError(inPkt.Peer, errInvalidMessage)
  190. }
  191. }
  192. // Cleanup
  193. }
  194. //-----------------------------------------------------------------------------
  195. // blockManagerState keeps track of which block parts are stored locally.
  196. // It's also persisted via JSON in the db.
  197. type blockManagerState struct {
  198. lastHeaderHeight uint64 // Last contiguous header height
  199. lastValidationHeight uint64 // Last contiguous validation height
  200. lastTxsHeight uint64 // Last contiguous txs height
  201. otherHeaderHeights []uint64
  202. otherValidationHeights []uint64
  203. otherTxsHeights []uint64
  204. }
  205. //-----------------------------------------------------------------------------
  206. // dataRequest keeps track of each request for a given peice of data & peer.
  207. type dataRequest struct {
  208. peer *p2p.Peer
  209. dataType byte
  210. height uint64
  211. time time.Time
  212. }
  213. //-----------------------------------------------------------------------------
  214. // dataState keeps track of all requests for a given piece of data.
  215. type dataState struct {
  216. mtx sync.Mutex
  217. requests []*dataRequest
  218. }
  219. func (ds *dataState) removeRequestsForDataType(dataType byte) []*dataRequest {
  220. ds.mtx.Lock()
  221. defer ds.mtx.Lock()
  222. requests := []*dataRequest{}
  223. filtered := []*dataRequest{}
  224. for _, request := range ds.requests {
  225. if request.dataType == dataType {
  226. filtered = append(filtered, request)
  227. } else {
  228. requests = append(requests, request)
  229. }
  230. }
  231. ds.requests = requests
  232. return filtered
  233. }
  234. //-----------------------------------------------------------------------------
  235. // XXX
  236. type peerState struct {
  237. mtx sync.Mutex
  238. lastHeaderHeight uint64 // Last contiguous header height
  239. lastValidationHeight uint64 // Last contiguous validation height
  240. lastTxsHeight uint64 // Last contiguous txs height
  241. dataBytesSent uint64 // Data bytes sent to peer
  242. dataBytesReceived uint64 // Data bytes received from peer
  243. numItemsReceived uint64 // Number of data items received
  244. numItemsUnreceived uint64 // Number of data items requested but not received
  245. numItemsSent uint64 // Number of data items sent
  246. requests map[string]*dataRequest
  247. }
  248. func (ps *peerState) applyStateMessage(msg *stateMessage) {
  249. ps.mtx.Lock()
  250. defer ps.mtx.Unlock()
  251. ps.lastHeaderHeight = msg.lastHeaderHeight
  252. ps.lastValidationHeight = msg.lastValidationHeight
  253. ps.lastTxsHeight = msg.lastTxsHeight
  254. }
  255. // Call this function for each data item received from peer, if the item was requested.
  256. // If the request timed out, dataBytesReceived is set to 0 to denote failure.
  257. func (ps *peerState) didReceiveData(dataKey string, dataBytesReceived uint64) {
  258. ps.mtx.Lock()
  259. defer ps.mtx.Lock()
  260. request := ps.requests[dataKey]
  261. if request == nil {
  262. log.Warning("Could not find peerState request with dataKey %v", dataKey)
  263. return
  264. }
  265. if dataBytesReceived == 0 {
  266. ps.numItemsUnreceived += 1
  267. } else {
  268. ps.dataBytesReceived += dataBytesReceived
  269. ps.numItemsReceived += 1
  270. }
  271. delete(ps.requests, dataKey)
  272. }
  273. // Call this function for each data item sent to peer, if the item was requested.
  274. func (ps *peerState) didSendData(dataKey string, dataBytesSent uint64) {
  275. ps.mtx.Lock()
  276. defer ps.mtx.Lock()
  277. if dataBytesSent == 0 {
  278. log.Warning("didSendData expects dataBytesSent > 0")
  279. return
  280. }
  281. ps.dataBytesSent += dataBytesSent
  282. ps.numItemsSent += 1
  283. }
  284. //-----------------------------------------------------------------------------
  285. /* Messages */
  286. // TODO: check for unnecessary extra bytes at the end.
  287. func decodeMessage(bz ByteSlice) (msg Message) {
  288. // log.Debug("decoding msg bytes: %X", bz)
  289. switch Byte(bz[0]) {
  290. case msgTypeState:
  291. return &stateMessage{}
  292. case msgTypeRequest:
  293. return readRequestMessage(bytes.NewReader(bz[1:]))
  294. case msgTypeData:
  295. return readDataMessage(bytes.NewReader(bz[1:]))
  296. default:
  297. return nil
  298. }
  299. }
  300. /*
  301. A stateMessage declares what (contiguous) blocks & headers are known.
  302. */
  303. type stateMessage struct {
  304. lastHeaderHeight uint64 // Last contiguous header height
  305. lastValidationHeight uint64 // Last contiguous validation height
  306. lastTxsHeight uint64 // Last contiguous txs height
  307. }
  308. func readStateMessage(r io.Reader) *stateMessage {
  309. lastHeaderHeight := ReadUInt64(r)
  310. lastValidationHeight := ReadUInt64(r)
  311. lastTxsHeight := ReadUInt64(r)
  312. return &stateMessage{
  313. lastHeaderHeight: lastHeaderHeight,
  314. lastValidationHeight: lastValidationHeight,
  315. lastTxsHeight: lastTxsHeight,
  316. }
  317. }
  318. func (m *stateMessage) WriteTo(w io.Writer) (n int64, err error) {
  319. n, err = WriteTo(msgTypeState, w, n, err)
  320. n, err = WriteTo(m.lastHeaderHeight, w, n, err)
  321. n, err = WriteTo(m.lastValidationHeight, w, n, err)
  322. n, err = WriteTo(m.lastTxsHeight, w, n, err)
  323. return
  324. }
  325. func (m *stateMessage) String() string {
  326. return fmt.Sprintf("[State %v/%v/%v]",
  327. m.lastHeaderHeight, m.lastValidationHeight, m.lastTxsHeight)
  328. }
  329. /*
  330. A requestMessage requests a block and/or header at a given height.
  331. */
  332. type requestMessage struct {
  333. dataType Byte
  334. height UInt64
  335. }
  336. func readRequestMessage(r io.Reader) *requestMessage {
  337. requestType := ReadByte(r)
  338. height := ReadUInt64(r)
  339. return &requestMessage{
  340. dataType: requestType,
  341. height: height,
  342. }
  343. }
  344. func (m *requestMessage) WriteTo(w io.Writer) (n int64, err error) {
  345. n, err = WriteTo(msgTypeRequest, w, n, err)
  346. n, err = WriteTo(m.dataType, w, n, err)
  347. n, err = WriteTo(m.height, w, n, err)
  348. return
  349. }
  350. func (m *requestMessage) String() string {
  351. return fmt.Sprintf("[Request %X@%v]", m.dataType, m.height)
  352. }
  353. /*
  354. A dataMessage contains block data, maybe requested.
  355. The data can be a Validation, Txs, or whole Block object.
  356. */
  357. type dataMessage struct {
  358. dataType Byte
  359. height UInt64
  360. bytes ByteSlice
  361. }
  362. func readDataMessage(r io.Reader) *dataMessage {
  363. dataType := ReadByte(r)
  364. height := ReadUInt64(r)
  365. bytes := ReadByteSlice(r)
  366. return &dataMessage{
  367. dataType: dataType,
  368. height: height,
  369. bytes: bytes,
  370. }
  371. }
  372. func (m *dataMessage) WriteTo(w io.Writer) (n int64, err error) {
  373. n, err = WriteTo(msgTypeData, w, n, err)
  374. n, err = WriteTo(m.dataType, w, n, err)
  375. n, err = WriteTo(m.height, w, n, err)
  376. n, err = WriteTo(m.bytes, w, n, err)
  377. return
  378. }
  379. func (m *dataMessage) String() string {
  380. return fmt.Sprintf("[Data %X@%v]", m.dataType, m.height)
  381. }