diff --git a/blocks/block.go b/blocks/block.go index 70a247b5a..37b5ce439 100644 --- a/blocks/block.go +++ b/blocks/block.go @@ -7,12 +7,10 @@ import ( ) /* Block */ - type Block struct { Header Validation Txs - // Checkpoint } func ReadBlock(r io.Reader) *Block { @@ -35,7 +33,6 @@ func (self *Block) WriteTo(w io.Writer) (n int64, err error) { } /* Block > Header */ - type Header struct { Name String Height UInt64 @@ -70,7 +67,6 @@ func (self *Header) WriteTo(w io.Writer) (n int64, err error) { } /* Block > Validation */ - type Validation struct { Signatures []Signature Adjustments []Adjustment @@ -106,7 +102,6 @@ func (self *Validation) WriteTo(w io.Writer) (n int64, err error) { } /* Block > Txs */ - type Txs struct { Txs []Tx } diff --git a/blocks/block_manager.go b/blocks/block_manager.go index 2654645fb..4dd44959b 100644 --- a/blocks/block_manager.go +++ b/blocks/block_manager.go @@ -13,22 +13,55 @@ const ( msgTypeRequest = Byte(0x02) msgTypeData = Byte(0x03) - DataTypeFullBlock = byte(0x00) - DataTypeValidation = byte(0x01) - DataTypeTxs = byte(0x02) - // dataTypeCheckpoint = byte(0x04) - dbKeyState = "state" ) -/* - */ +//----------------------------------------------------------------------------- + +// We request each item separately. + +const ( + dataTypeHeader = byte(0x01) + dataTypeValidation = byte(0x02) + dataTypeTxs = byte(0x03) +) + +func _dataKey(dataType byte, height int) { + switch dataType { + case dataTypeHeader: + return fmt.Sprintf("H%v", height) + case dataTypeValidation: + return fmt.Sprintf("V%v", height) + case dataTypeTxs: + return fmt.Sprintf("T%v", height) + default: + panic("Unknown datatype %X", dataType) + } +} + +func dataTypeFromObj(data interface{}) { + switch data.(type) { + case *Header: + return dataTypeHeader + case *Validation: + return dataTypeValidation + case *Txs: + return dataTypeTxs + default: + panic("Unexpected datatype: %v", data) + } +} + +//----------------------------------------------------------------------------- + +// TODO: document type BlockManager struct { db *db_.LevelDB sw *p2p.Switch swEvents chan interface{} state blockManagerState - peerStates map[string]*blockManagerState + dataStates map[string]*dataState // TODO: replace with CMap + peerStates map[string]*peerState // TODO: replace with CMap quit chan struct{} started uint32 stopped uint32 @@ -41,7 +74,8 @@ func NewBlockManager(sw *p2p.Switch, db *db_.LevelDB) *BlockManager { db: db, sw: sw, swEvents: swEvents, - peerStates: make(map[string]*blockManagerState), + dataStates: make(map[string]*dataState), + peerStates: make(map[string]*peerState), quit: make(chan struct{}), } bm.loadState() @@ -63,13 +97,29 @@ func (bm *BlockManager) Stop() { } } -func (bm *BlockManager) StoreData(dataType byte, dataObj interface{}) { - // Validate data if possible. +// NOTE: assumes that data is already validated. +func (bm *BlockManager) StoreData(dataObj interface{}) { + bm.mtx.Lock() + defer bm.mtx.Unlock() + dataType := dataTypeForObj(dataObj) + dataKey := _dataKey(dataType, dataObj) + // Update state + // TODO + // Remove dataState entry, we'll no longer request this. + _dataState := bm.dataStates[dataKey] + removedRequests := _dataState.removeRequestsForDataType(dataType) + for _, request := range removedRequests { + // TODO in future, notify peer that the request has been canceled. + // No point doing this yet, requests in blocksCh are handled singlethreaded. + } + // What are we doing here? + _peerState := bm.peerstates[dataKey] + // If we have new data that extends our contiguous range, then announce it. } -func (bm *BlockManager) LoadData(dataType byte) interface{} { - // NOTE: who's calling? +func (bm *BlockManager) LoadData(dataType byte, height int) interface{} { + panic("not yet implemented") } func (bm *BlockManager) loadState() { @@ -93,6 +143,7 @@ func (bm *BlockManager) saveState() { bm.db.Set(dbKeyState, stateBytes) } +// Handle peer new/done events func (bm *BlockManager) switchEventsHandler() { for { swEvent, ok := <-bm.swEvents @@ -103,46 +154,169 @@ func (bm *BlockManager) switchEventsHandler() { case p2p.SwitchEventNewPeer: event := swEvent.(p2p.SwitchEventNewPeer) // Create entry in .peerStates + bm.peerStates[event.Peer.RemoteAddress().String()] = &peerState{} // Share our state with event.Peer - event.Peer + msg := &stateMessage{ + lastHeaderHeight: bm.state.lastHeaderHeight, + lastValidationHeight: bm.state.lastValidationHeight, + lastTxsHeight: bm.state.lastTxsHeight, + } + tm := p2p.TypedMessage{msgTypeRequest, msg} + event.Peer.TrySend(NewPacket(blocksCh, tm)) case p2p.SwitchEventDonePeer: // Remove entry from .peerStates + delete(bm.peerStates, event.Peer.RemoteAddress().String()) + default: + log.Warning("Unhandled switch event type") } } } +// Handle requests from the blocks channel +func (bm *BlockManager) requestsHandler() { + for { + inPkt := bm.sw.Receive(blocksCh) // {Peer, Time, Packet} + if inPkt == nil { + // Client has stopped + break + } + + // decode message + msg := decodeMessage(inPkt.Bytes) + log.Info("requestHandler received %v", msg) + + switch msg.(type) { + case *stateMessage: + m := msg.(*stateMessage) + peerState := bm.peerStates[inPkt.Peer.RemoteAddress.String()] + if peerState == nil { + continue // peer has since been disconnected. + } + peerState.applyStateMessage(m) + // Consider requesting data. + // 1. if has more validation and we want it + // 2. if has more txs and we want it + // if peerState.estimatedCredit() >= averageBlock + + // TODO: keep track of what we've requested from peer. + // TODO: keep track of from which peers we've requested data. + // TODO: be fair. + case *requestMessage: + // TODO: prevent abuse. + case *dataMessage: + // See if we want the data. + // Validate data. + // Add to db. + // Update state & broadcast as necessary. + default: + // Ignore unknown message + // bm.sw.StopPeerForError(inPkt.Peer, errInvalidMessage) + } + } + + // Cleanup +} + //----------------------------------------------------------------------------- -/* This is just to persist the block manager state in the db. */ +// blockManagerState keeps track of which block parts are stored locally. +// It's also persisted via JSON in the db. type blockManagerState struct { - LastHeaderHeight uint64 // Last contiguous header height - OtherHeaderHeights []uint64 - LastValidationHeight uint64 // Last contiguous validation height - OtherValidationHeights []uint64 - LastTxsHeight uint64 // Last contiguous txs height - OtherTxsHeights []uint64 + lastHeaderHeight uint64 // Last contiguous header height + lastValidationHeight uint64 // Last contiguous validation height + lastTxsHeight uint64 // Last contiguous txs height + otherHeaderHeights []uint64 + otherValidationHeights []uint64 + otherTxsHeights []uint64 } //----------------------------------------------------------------------------- -/* -Each part of a block are stored separately in the db. -*/ +// dataRequest keeps track of each request for a given peice of data & peer. +type dataRequest struct { + peer *p2p.Peer + dataType byte + height uint64 + time time.Time +} + +//----------------------------------------------------------------------------- -func headerKey(height int) { - return fmt.Sprintf("B%v", height) +// dataState keeps track of all requests for a given piece of data. +type dataState struct { + mtx sync.Mutex + requests []*dataRequest } -func validationKey(height int) { - return fmt.Sprintf("V%v", height) +func (ds *dataState) removeRequestsForDataType(dataType byte) []*dataRequest { + ds.mtx.Lock() + defer ds.mtx.Lock() + requests := []*dataRequest{} + filtered := []*dataRequest{} + for _, request := range ds.requests { + if request.dataType == dataType { + filtered = append(filtered, request) + } else { + requests = append(requests, request) + } + } + ds.requests = requests + return filtered } -func txsKey(height int) { - return fmt.Sprintf("T%v", height) +//----------------------------------------------------------------------------- + +// XXX +type peerState struct { + mtx sync.Mutex + lastHeaderHeight uint64 // Last contiguous header height + lastValidationHeight uint64 // Last contiguous validation height + lastTxsHeight uint64 // Last contiguous txs height + dataBytesSent uint64 // Data bytes sent to peer + dataBytesReceived uint64 // Data bytes received from peer + numItemsReceived uint64 // Number of data items received + numItemsUnreceived uint64 // Number of data items requested but not received + numItemsSent uint64 // Number of data items sent + requests map[string]*dataRequest } -func checkpointKey(height int) { - return fmt.Sprintf("C%v", height) +func (ps *peerState) applyStateMessage(msg *stateMessage) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + ps.lastHeaderHeight = msg.lastHeaderHeight + ps.lastValidationHeight = msg.lastValidationHeight + ps.lastTxsHeight = msg.lastTxsHeight +} + +// Call this function for each data item received from peer, if the item was requested. +// If the request timed out, dataBytesReceived is set to 0 to denote failure. +func (ps *peerState) didReceiveData(dataKey string, dataBytesReceived uint64) { + ps.mtx.Lock() + defer ps.mtx.Lock() + request := ps.requests[dataKey] + if request == nil { + log.Warning("Could not find peerState request with dataKey %v", dataKey) + return + } + if dataBytesReceived == 0 { + ps.numItemsUnreceived += 1 + } else { + ps.dataBytesReceived += dataBytesReceived + ps.numItemsReceived += 1 + } + delete(ps.requests, dataKey) +} + +// Call this function for each data item sent to peer, if the item was requested. +func (ps *peerState) didSendData(dataKey string, dataBytesSent uint64) { + ps.mtx.Lock() + defer ps.mtx.Lock() + if dataBytesSent == 0 { + log.Warning("didSendData expects dataBytesSent > 0") + return + } + ps.dataBytesSent += dataBytesSent + ps.numItemsSent += 1 } //----------------------------------------------------------------------------- @@ -154,7 +328,7 @@ func decodeMessage(bz ByteSlice) (msg Message) { // log.Debug("decoding msg bytes: %X", bz) switch Byte(bz[0]) { case msgTypeState: - return &StateMessage{} + return &stateMessage{} case msgTypeRequest: return readRequestMessage(bytes.NewReader(bz[1:])) case msgTypeData: @@ -165,93 +339,95 @@ func decodeMessage(bz ByteSlice) (msg Message) { } /* -A StateMessage declares what (contiguous) blocks & headers are known. - -LastValidationHeight >= LastBlockHeight. +A stateMessage declares what (contiguous) blocks & headers are known. */ -type StateMessage struct { - LastBlockHeight UInt64 - LastValidationHeight UInt64 +type stateMessage struct { + lastHeaderHeight uint64 // Last contiguous header height + lastValidationHeight uint64 // Last contiguous validation height + lastTxsHeight uint64 // Last contiguous txs height } -func readStateMessage(r io.Reader) *StateMessage { - lastBlockHeight := ReadUInt64(r) +func readStateMessage(r io.Reader) *stateMessage { + lastHeaderHeight := ReadUInt64(r) lastValidationHeight := ReadUInt64(r) - return &StateMessage{ - LastBlockHeight: lastBlockHeight, - LastValidationHeight: lastValidationHeight, + lastTxsHeight := ReadUInt64(r) + return &stateMessage{ + lastHeaderHeight: lastHeaderHeight, + lastValidationHeight: lastValidationHeight, + lastTxsHeight: lastTxsHeight, } } -func (m *StateMessage) WriteTo(w io.Writer) (n int64, err error) { +func (m *stateMessage) WriteTo(w io.Writer) (n int64, err error) { n, err = WriteTo(msgTypeState, w, n, err) - n, err = WriteTo(m.LastBlockHeight, w, n, err) - n, err = WriteTo(m.LastValidationHeight, w, n, err) + n, err = WriteTo(m.lastHeaderHeight, w, n, err) + n, err = WriteTo(m.lastValidationHeight, w, n, err) + n, err = WriteTo(m.lastTxsHeight, w, n, err) return } -func (m *StateMessage) String() string { - return fmt.Sprintf("[State %v/%v]", - m.LastBlockHeight, m.LastValidationHeight) +func (m *stateMessage) String() string { + return fmt.Sprintf("[State %v/%v/%v]", + m.lastHeaderHeight, m.lastValidationHeight, m.lastTxsHeight) } /* -A RequestMessage requests a block and/or header at a given height. +A requestMessage requests a block and/or header at a given height. */ -type RequestMessage struct { - Type Byte - Height UInt64 +type requestMessage struct { + dataType Byte + height UInt64 } -func readRequestMessage(r io.Reader) *RequestMessage { +func readRequestMessage(r io.Reader) *requestMessage { requestType := ReadByte(r) height := ReadUInt64(r) - return &RequestMessage{ - Type: requestType, - Height: height, + return &requestMessage{ + dataType: requestType, + height: height, } } -func (m *RequestMessage) WriteTo(w io.Writer) (n int64, err error) { +func (m *requestMessage) WriteTo(w io.Writer) (n int64, err error) { n, err = WriteTo(msgTypeRequest, w, n, err) - n, err = WriteTo(m.Type, w, n, err) - n, err = WriteTo(m.Height, w, n, err) + n, err = WriteTo(m.dataType, w, n, err) + n, err = WriteTo(m.height, w, n, err) return } -func (m *RequestMessage) String() string { - return fmt.Sprintf("[Request %X@%v]", m.Type, m.Height) +func (m *requestMessage) String() string { + return fmt.Sprintf("[Request %X@%v]", m.dataType, m.height) } /* -A DataMessage contains block data, maybe requested. +A dataMessage contains block data, maybe requested. The data can be a Validation, Txs, or whole Block object. */ -type DataMessage struct { - Type Byte - Height UInt64 - Bytes ByteSlice +type dataMessage struct { + dataType Byte + height UInt64 + bytes ByteSlice } -func readDataMessage(r io.Reader) *DataMessage { +func readDataMessage(r io.Reader) *dataMessage { dataType := ReadByte(r) height := ReadUInt64(r) bytes := ReadByteSlice(r) - return &DataMessage{ - Type: dataType, - Height: height, - Bytes: bytes, + return &dataMessage{ + dataType: dataType, + height: height, + bytes: bytes, } } -func (m *DataMessage) WriteTo(w io.Writer) (n int64, err error) { +func (m *dataMessage) WriteTo(w io.Writer) (n int64, err error) { n, err = WriteTo(msgTypeData, w, n, err) - n, err = WriteTo(m.Type, w, n, err) - n, err = WriteTo(m.Height, w, n, err) - n, err = WriteTo(m.Bytes, w, n, err) + n, err = WriteTo(m.dataType, w, n, err) + n, err = WriteTo(m.height, w, n, err) + n, err = WriteTo(m.bytes, w, n, err) return } -func (m *DataMessage) String() string { - return fmt.Sprintf("[Data %X@%v]", m.Type, m.Height) +func (m *dataMessage) String() string { + return fmt.Sprintf("[Data %X@%v]", m.dataType, m.height) } diff --git a/p2p/peer.go b/p2p/peer.go index 0ff2ecbdb..1fa2d0cac 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -59,10 +59,6 @@ func (p *Peer) IsOutbound() bool { return p.outbound } -func (p *Peer) LocalAddress() *NetAddress { - return p.conn.LocalAddress() -} - func (p *Peer) RemoteAddress() *NetAddress { return p.conn.RemoteAddress() } diff --git a/p2p/peer_manager.go b/p2p/peer_manager.go index d331a230c..bbd8753bc 100644 --- a/p2p/peer_manager.go +++ b/p2p/peer_manager.go @@ -50,8 +50,8 @@ func (pm *PeerManager) Start() { if atomic.CompareAndSwapUint32(&pm.started, 0, 1) { log.Info("Starting PeerManager") go pm.switchEventsHandler() + go pm.requestHandler() go pm.ensurePeersHandler() - go pm.pexHandler() } } @@ -167,7 +167,7 @@ func (pm *PeerManager) ensurePeers() { } // Handles incoming PEX messages. -func (pm *PeerManager) pexHandler() { +func (pm *PeerManager) requestHandler() { for { inPkt := pm.sw.Receive(pexCh) // {Peer, Time, Packet} @@ -178,7 +178,7 @@ func (pm *PeerManager) pexHandler() { // decode message msg := decodeMessage(inPkt.Bytes) - log.Info("pexHandler received %v", msg) + log.Info("requestHandler received %v", msg) switch msg.(type) { case *pexRequestMessage: @@ -200,8 +200,8 @@ func (pm *PeerManager) pexHandler() { pm.book.AddAddress(addr, srcAddr) } default: - // Bad peer. - pm.sw.StopPeerForError(inPkt.Peer, pexErrInvalidMessage) + // Ignore unknown message. + // pm.sw.StopPeerForError(inPkt.Peer, pexErrInvalidMessage) } } diff --git a/p2p/switch.go b/p2p/switch.go index 01ab970c8..03acb326e 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -43,6 +43,7 @@ func NewSwitch(channels []ChannelDescriptor) *Switch { // make pktRecvQueues... pktRecvQueues := make(map[string]chan *InboundPacket) for _, chDesc := range channels { + // XXX: buffer this pktRecvQueues[chDesc.Name] = make(chan *InboundPacket) }