From 8db5b7b614da3782bab13c1b6c55178dd45969e5 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 3 Aug 2014 15:50:28 -0700 Subject: [PATCH] move everything to blockManagerState; sim --- blocks/block_manager.go | 530 ++++++++++++++++++++++++++++------------ common/heap.go | 8 +- db/level_db.go | 10 + db/mem_db.go | 3 + p2p/connection.go | 74 ++++-- p2p/peer.go | 17 +- sim/bench.go | 411 +++++++++++++++++++++++++++++++ 7 files changed, 879 insertions(+), 174 deletions(-) create mode 100644 sim/bench.go diff --git a/blocks/block_manager.go b/blocks/block_manager.go index fec6ae4f6..fcad79951 100644 --- a/blocks/block_manager.go +++ b/blocks/block_manager.go @@ -25,14 +25,21 @@ const ( msgTypeState = Byte(0x01) msgTypeRequest = Byte(0x02) msgTypeData = Byte(0x03) + + maxRequestsPerPeer = 2 // Maximum number of outstanding requests from peer. + maxRequestsPerData = 2 // Maximum number of outstanding requests of some data. + maxRequestAheadBlock = 5 // Maximum number of blocks to request ahead of current verified. Must be >= 1 + + defaultRequestTimeoutS = + timeoutRepeatTimerMS = 1000 // Handle timed out requests periodically ) /* -TODO: keep tabs on current active requests onPeerState. TODO: keep a heap of dataRequests * their corresponding timeouts. timeout dataRequests and update the peerState, -TODO: when a data item has bene received successfully, update the peerState. -ensure goroutine safety. +TODO: need to keep track of progress, blocks are too large. or we need to chop into chunks. +TODO: need to validate blocks. :/ +TODO: actually save the block. */ //----------------------------------------------------------------------------- @@ -42,10 +49,32 @@ const ( // TODO: allow for more types, such as specific transactions ) -func computeDataKey(dataType byte, height uint64) string { +type dataKey struct { + dataType byte + height uint64 +} + +func newDataKey(dataType byte, height uint64) dataKey { + return dataKey{dataType, height} +} + +func readDataKey(r io.Reader) dataKey { + return dataKey{ + dataType: ReadByte(r), + height: ReadUInt64(r), + } +} + +func (dk dataKey) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(dk.dataType, w, n, err) + n, err = WriteTo(dk.height, w, n, err) + return +} + +func (dk dataKey) String() string { switch dataType { case dataTypeBlock: - return fmt.Sprintf("B%v", height) + return dataKeyfmt.Sprintf("B%v", height) default: Panicf("Unknown datatype %X", dataType) return "" // should not happen @@ -55,27 +84,26 @@ func computeDataKey(dataType byte, height uint64) string { //----------------------------------------------------------------------------- type BlockManager struct { - db *db_.LevelDB - sw *p2p.Switch - swEvents chan interface{} - state blockManagerState - dataStates map[string]*dataState // TODO: replace with CMap - peerStates map[string]*peerState // TODO: replace with CMap - quit chan struct{} - started uint32 - stopped uint32 + db *db_.LevelDB + sw *p2p.Switch + swEvents chan interface{} + state *blockManagerState + timeoutTimer *RepeatTimer + quit chan struct{} + started uint32 + stopped uint32 } func NewBlockManager(sw *p2p.Switch, db *db_.LevelDB) *BlockManager { swEvents := make(chan interface{}) sw.AddEventListener("BlockManager.swEvents", swEvents) bm := &BlockManager{ - db: db, - sw: sw, - swEvents: swEvents, - dataStates: make(map[string]*dataState), - peerStates: make(map[string]*peerState), - quit: make(chan struct{}), + db: db, + sw: sw, + swEvents: swEvents, + state: newBlockManagerState(), + timeoutTimer: NewRepeatTimer(timeoutRepeatTimerMS * time.Second), + quit: make(chan struct{}), } bm.loadState() return bm @@ -85,6 +113,9 @@ func (bm *BlockManager) Start() { if atomic.CompareAndSwapUint32(&bm.started, 0, 1) { log.Info("Starting BlockManager") go bm.switchEventsHandler() + go bm.blocksInfoHandler() + go bm.blocksDataHandler() + go bm.requestTimeoutHandler() } } @@ -100,61 +131,32 @@ func (bm *BlockManager) Stop() { // "request" is optional, it's the request response that supplied // the data. func (bm *BlockManager) StoreBlock(block *Block, origin *dataRequest) { - dataKey := computeDataKey(dataTypeBlock, uint64(block.Header.Height)) - // Remove dataState entry, we'll no longer request this. - _dataState := bm.dataStates[dataKey] - removedRequests := _dataState.removeRequestsForDataType(dataTypeBlock) - for _, request := range removedRequests { - // Notify peer that the request has been canceled. - if request.peer.Equals(origin.peer) { - continue - } else { - // Send cancellation on blocksInfoCh channel - msg := &requestMessage{ - dataType: Byte(dataTypeBlock), - height: block.Header.Height, - canceled: Byte(0x01), - } - tm := p2p.TypedMessage{msgTypeRequest, msg} - request.peer.TrySend(blocksInfoCh, tm.Bytes()) + dataKey := newDataKey(dataTypeBlock, uint64(block.Header.Height)) + + // XXX actually save the block. + + canceled, newHeight := bm.state.didGetDataFromPeer(dataKey, origin.peer) + + // Notify peers that the request has been canceled. + for _, request := range canceled { + msg := &requestMessage{ + key: dataKey, + type_: requestTypeCanceled, } - // Remove dataRequest from request.peer's peerState. - peerState := bm.peerStates[request.peer.Key] - peerState.remoteDataRequest(request) + tm := p2p.TypedMessage{msgTypeRequest, msg} + request.peer.TrySend(blocksInfoCh, tm.Bytes()) } - // Update state - newContiguousHeight := bm.state.addData(dataTypeBlock, uint64(block.Header.Height)) + // If we have new data that extends our contiguous range, then announce it. - if newContiguousHeight { - bm.sw.Broadcast(blocksInfoCh, bm.state.stateMessage()) + if newHeight { + bm.sw.Broadcast(blocksInfoCh, bm.state.makeStateMessage()) } } -func (bm *BlockManager) LoadData(dataType byte, height uint64) interface{} { +func (bm *BlockManager) LoadBlock(height uint64) *Block { panic("not yet implemented") } -func (bm *BlockManager) loadState() { - // Load the state - stateBytes := bm.db.Get(dbKeyState) - if stateBytes == nil { - log.Info("New BlockManager with no state") - } else { - err := json.Unmarshal(stateBytes, &bm.state) - if err != nil { - Panicf("Could not unmarshal state bytes: %X", stateBytes) - } - } -} - -func (bm *BlockManager) saveState() { - stateBytes, err := json.Marshal(&bm.state) - if err != nil { - panic("Could not marshal state bytes") - } - bm.db.Set(dbKeyState, stateBytes) -} - // Handle peer new/done events func (bm *BlockManager) switchEventsHandler() { for { @@ -165,8 +167,8 @@ func (bm *BlockManager) switchEventsHandler() { switch swEvent.(type) { case p2p.SwitchEventNewPeer: event := swEvent.(p2p.SwitchEventNewPeer) - // Create entry in .peerStates - bm.peerStates[event.Peer.Key] = &peerState{} + // Create peerState for event.Peer + bm.state.createEntryForPeer(event.Peer) // Share our state with event.Peer msg := &stateMessage{ lastBlockHeight: UInt64(bm.state.lastBlockHeight), @@ -175,47 +177,122 @@ func (bm *BlockManager) switchEventsHandler() { event.Peer.TrySend(blocksInfoCh, tm.Bytes()) case p2p.SwitchEventDonePeer: event := swEvent.(p2p.SwitchEventDonePeer) - // Remove entry from .peerStates - delete(bm.peerStates, event.Peer.Key) + // Delete peerState for event.Peer + bm.state.deleteEntryForPeer(event.Peer) default: log.Warning("Unhandled switch event type") } } } -// Handle requests from the blocks channel -func (bm *BlockManager) requestsHandler() { +// Handle requests/cancellations from the blocksInfo channel +func (bm *BlockManager) blocksInfoHandler() { for { inMsg, ok := bm.sw.Receive(blocksInfoCh) if !ok { - // Client has stopped - break + break // Client has stopped } - // decode message msg := decodeMessage(inMsg.Bytes) - log.Info("requestHandler received %v", msg) + log.Info("blocksInfoHandler received %v", msg) switch msg.(type) { case *stateMessage: m := msg.(*stateMessage) - peerState := bm.peerStates[inMsg.MConn.Peer.Key] + peerState := bm.getPeerState(inMsg.MConn.Peer) if peerState == nil { continue // peer has since been disconnected. } - peerState.applyStateMessage(m) + newDataTypes := peerState.applyStateMessage(m) // Consider requesting data. - // 1. if has more validation and we want it - // 2. if has more txs and we want it - // if peerState.estimatedCredit() >= averageBlock - - // TODO: keep track of what we've requested from peer. - // TODO: keep track of from which peers we've requested data. - // TODO: be fair. + // Does the peer claim to have something we want? + FOR_LOOP: + for _, newDataType := range newDataTypes { + // Are we already requesting too much data from peer? + if !peerState.canRequestMore() { + break FOR_LOOP + } + for _, wantedKey := range bm.state.nextWantedKeysForType(newDataType) { + if !peerState.hasData(wantedKey) { + break FOR_LOOP + } + // Request wantedKey from peer. + msg := &requestMessage{ + key: dataKey, + type_: requestTypeFetch, + } + tm := p2p.TypedMessage{msgTypeRequest, msg} + sent := inMsg.MConn.Peer.TrySend(blocksInfoCh, tm.Bytes()) + if sent { + // Log the request + request := &dataRequest{ + peer: inMsg.MConn.Peer, + key: wantedKey, + time: time.Now(), + timeout: time.Now().Add(defaultRequestTimeout + } + bm.state.addDataRequest(request) + } + } + } case *requestMessage: - // TODO: prevent abuse. + m := msg.(*requestMessage) + switch m.type_ { + case requestTypeFetch: + // TODO: prevent abuse. + if !inMsg.MConn.Peer.CanSend(blocksDataCh) { + msg := &requestMessage{ + key: dataKey, + type_: requestTypeTryAgain, + } + tm := p2p.TypedMessage{msgTypeRequest, msg} + sent := inMsg.MConn.Peer.TrySend(blocksInfoCh, tm.Bytes()) + } else { + // If we don't have it, log and ignore. + block := bm.LoadBlock(m.key.height) + if block == nil { + log.Warning("Peer %v asked for nonexistant block %v", inMsg.MConn.Peer, m.key) + } + // Send the data. + msg := &dataMessage{ + key: dataKey, + bytes: BinaryBytes(block), + } + tm := p2p.TypedMessage{msgTypeData, msg} + inMsg.MConn.Peer.TrySend(blocksDataCh, tm.Bytes()) + } + case requestTypeCanceled: + // TODO: handle + // This requires modifying mconnection to keep track of item keys. + case requestTypeTryAgain: + // TODO: handle + default: + log.Warning("Invalid request: %v", m) + // Ignore. + } + default: + // should not happen + Panicf("Unknown message %v", msg) + // bm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) + } + } + + // Cleanup +} + +// Handle receiving data from the blocksData channel +func (bm *BlockManager) blocksDataHandler() { + for { + inMsg, ok := bm.sw.Receive(blocksDataCh) + if !ok { + break // Client has stopped + } + + msg := decodeMessage(inMsg.Bytes) + log.Info("blocksDataHandler received %v", msg) + + switch msg.(type) { case *dataMessage: - // XXX move this to another channe // See if we want the data. // Validate data. // Add to db. @@ -229,6 +306,17 @@ func (bm *BlockManager) requestsHandler() { // Cleanup } +// Handle timed out requests by requesting from others. +func (bm *BlockManager) requestTimeoutHandler() { + for { + _, ok := <-bm.timeoutTimer + if !ok { + break + } + // Iterate over requests by time and handle timed out requests. + } +} + //----------------------------------------------------------------------------- // blockManagerState keeps track of which block parts are stored locally. @@ -237,9 +325,56 @@ type blockManagerState struct { mtx sync.Mutex lastBlockHeight uint64 // Last contiguous header height otherBlockHeights map[uint64]struct{} + requestsByKey map[dataKey][]*dataRequest + requestsByTimeout *Heap // Could be a linkedlist, but more flexible. + peerStates map[string]*peerState +} + +func newBlockManagerState() *blockManagerState { + return &blockManagerState{ + requestsByKey: make(map[dataKey][]*dataRequest), + requestsByTimeout: NewHeap(), + peerStates: make(map[string]*peerState), + } +} + +type blockManagerStateJSON struct { + LastBlockHeight uint64 // Last contiguous header height + OtherBlockHeights map[uint64]struct{} +} + +func (bms *BlockManagerState) loadState(db _db.LevelDB) { + bms.mtx.Lock() + defer bms.mtx.Unlock() + stateBytes := db.Get(dbKeyState) + if stateBytes == nil { + log.Info("New BlockManager with no state") + } else { + bmsJSON := &blockManagerStateJSON{} + err := json.Unmarshal(stateBytes, bmsJSON) + if err != nil { + Panicf("Could not unmarshal state bytes: %X", stateBytes) + } + bms.lastBlockHeight = bmsJSON.LastBlockHeight + bms.otherBlockHeights = bmsJSON.OtherBlockHeights + } } -func (bms blockManagerState) stateMessage() *stateMessage { +func (bms *BlockManagerState) saveState(db _db.LevelDB) { + bms.mtx.Lock() + defer bms.mtx.Unlock() + bmsJSON := &blockManagerStateJSON{ + LastBlockHeight: bms.lastBlockHeight, + OtherBlockHeights: bms.otherBlockHeights, + } + stateBytes, err := json.Marshal(bmsJSON) + if err != nil { + panic("Could not marshal state bytes") + } + db.Set(dbKeyState, stateBytes) +} + +func (bms *blockManagerState) makeStateMessage() *stateMessage { bms.mtx.Lock() defer bms.mtx.Unlock() return &stateMessage{ @@ -247,12 +382,43 @@ func (bms blockManagerState) stateMessage() *stateMessage { } } -func (bms blockManagerState) addData(dataType byte, height uint64) bool { +func (bms *blockManagerState) createEntryForPeer(peer *peer) { bms.mtx.Lock() defer bms.mtx.Unlock() - if dataType != dataTypeBlock { - Panicf("Unknown datatype %X", dataType) + bms.peerStates[peer.Key] = &peerState{peer: peer} +} + +func (bms *blockManagerState) deleteEntryForPeer(peer *peer) { + bms.mtx.Lock() + defer bms.mtx.Unlock() + delete(bms.peerStates, peer.Key) +} + +func (bms *blockManagerState) getPeerState(peer *Peer) { + bms.mtx.Lock() + defer bms.mtx.Unlock() + return bms.peerStates[peer.Key] +} + +func (bms *blockManagerState) addDataRequest(newRequest *dataRequest) { + ps.mtx.Lock() + bms.requestsByKey[newRequest.key] = append(bms.requestsByKey[newRequest.key], newRequest) + bms.requestsByTimeout.Push(newRequest) // XXX + peerState, ok := bms.peerStates[newRequest.peer.Key] + ps.mtx.Unlock() + if ok { + peerState.addDataRequest(newRequest) } +} + +func (bms *blockManagerState) didGetDataFromPeer(key dataKey, peer *p2p.Peer) (canceled []*dataRequest, newHeight bool) { + bms.mtx.Lock() + defer bms.mtx.Unlock() + if key.dataType != dataTypeBlock { + Panicf("Unknown datatype %X", key.dataType) + } + // Adjust lastBlockHeight/otherBlockHeights. + height := key.height if bms.lastBlockHeight == height-1 { bms.lastBlockHeight = height height++ @@ -261,64 +427,126 @@ func (bms blockManagerState) addData(dataType byte, height uint64) bool { bms.lastBlockHeight = height height++ } - return true + newHeight = true + } + // Remove dataRequests + requests := bms.requestsByKey[key] + for _, request := range requests { + peerState, ok := bms.peerStates[peer.Key] + if ok { + peerState.removeDataRequest(request) + } + if request.peer == peer { + continue + } + canceled = append(canceled, request) + } + delete(bms.requestsByKey, key) + + return canceled, newHeight +} + +// Returns at most maxRequestAheadBlock dataKeys that we don't yet have & +// aren't already requesting from maxRequestsPerData peers. +func (bms *blockManagerState) nextWantedKeysForType(dataType byte) []dataKey { + bms.mtx.Lock() + defer bms.mtx.Unlock() + var keys []dataKey + switch dataType { + case dataTypeBlock: + for h := bms.lastBlockHeight + 1; h <= bms.lastBlockHeight+maxRequestAheadBlock; h++ { + if _, ok := bms.otherBlockHeights[h]; !ok { + key := newDataKey(dataTypeBlock, h) + if len(bms.requestsByKey[key]) < maxRequestsPerData { + keys = append(keys, key) + } + } + } + return keys + default: + Panicf("Unknown datatype %X", dataType) + return } - return false } //----------------------------------------------------------------------------- // dataRequest keeps track of each request for a given peice of data & peer. type dataRequest struct { - peer *p2p.Peer - dataType byte - height uint64 - time time.Time // XXX keep track of timeouts. + peer *p2p.Peer + key dataKey + time time.Time + timeout time.Time } //----------------------------------------------------------------------------- -// dataState keeps track of all requests for a given piece of data. -type dataState struct { - mtx sync.Mutex - requests []*dataRequest +type peerState struct { + mtx sync.Mutex + peer *Peer + lastBlockHeight uint64 // Last contiguous header height + requests []*dataRequest // Active requests + // XXX we need to } -func (ds *dataState) removeRequestsForDataType(dataType byte) []*dataRequest { - ds.mtx.Lock() - defer ds.mtx.Lock() - requests := []*dataRequest{} - filtered := []*dataRequest{} - for _, request := range ds.requests { - if request.dataType == dataType { - filtered = append(filtered, request) - } else { - requests = append(requests, request) - } +// Returns which dataTypes are new as declared by stateMessage. +func (ps *peerState) applyStateMessage(msg *stateMessage) []byte { + ps.mtx.Lock() + defer ps.mtx.Unlock() + var newTypes []byte + if uint64(msg.lastBlockHeight) > ps.lastBlockHeight { + newTypes = append(newTypes, dataTypeBlock) + ps.lastBlockHeight = uint64(msg.lastBlockHeight) + } else { + log.Info("Strange, peer declares a regression of %X", dataTypeBlock) } - ds.requests = requests - return filtered + return newTypes } -//----------------------------------------------------------------------------- - -type peerState struct { - mtx sync.Mutex - lastBlockHeight uint64 // Last contiguous header height +func (ps *peerState) hasData(key dataKey) bool { + ps.mtx.Lock() + defer ps.mtx.Unlock() + switch key.dataType { + case dataTypeBlock: + return key.height <= ps.lastBlockHeight + default: + Panicf("Unknown datatype %X", dataType) + return false // should not happen + } } -func (ps *peerState) applyStateMessage(msg *stateMessage) { +func (ps *peerState) addDataRequest(newRequest *dataRequest) { ps.mtx.Lock() defer ps.mtx.Unlock() - ps.lastBlockHeight = uint64(msg.lastBlockHeight) + for _, request := range ps.requests { + if request.key == newRequest.key { + return + } + } + ps.requests = append(ps.requests, newRequest) + return newRequest } -func (ps *peerState) addDataRequest(request *dataRequest) { - // TODO: keep track of dataRequests +func (ps *peerState) remoteDataRequest(key dataKey) bool { + ps.mtx.Lock() + defer ps.mtx.Unlock() + filtered := []*dataRequest{} + removed := false + for _, request := range ps.requests { + if request.key == key { + removed = true + } else { + filtered = append(filtered, request) + } + } + ps.requests = filtered + return removed } -func (ps *peerState) remoteDataRequest(request *dataRequest) { - // TODO: keep track of dataRequests, and remove them here. +func (ps *peerState) canRequestMore() bool { + ps.mtx.Lock() + defer ps.mtx.Unlock() + return len(ps.requests) < maxRequestsPerPeer } //----------------------------------------------------------------------------- @@ -367,32 +595,40 @@ func (m *stateMessage) String() string { A requestMessage requests a block and/or header at a given height. */ type requestMessage struct { - dataType Byte - height UInt64 - canceled Byte // 0x00 if request, 0x01 if cancellation + key dataKey + type_ Byte } +const ( + requestTypeFetch = Byte(0x01) + requestTypeCanceled = Byte(0x02) + requestTypeTryAgain = Byte(0x03) +) + func readRequestMessage(r io.Reader) *requestMessage { return &requestMessage{ - dataType: ReadByte(r), - height: ReadUInt64(r), - canceled: ReadByte(r), + key: ReadDataKey(r), + type_: ReadByte(r), } } func (m *requestMessage) WriteTo(w io.Writer) (n int64, err error) { n, err = WriteTo(msgTypeRequest, w, n, err) - n, err = WriteTo(m.dataType, w, n, err) - n, err = WriteTo(m.height, w, n, err) - n, err = WriteTo(m.canceled, w, n, err) + n, err = WriteTo(m.key, w, n, err) + n, err = WriteTo(m.type_, w, n, err) return } func (m *requestMessage) String() string { - if m.canceled == Byte(0x01) { - return fmt.Sprintf("[Cancellation %X@%v]", m.dataType, m.height) - } else { - return fmt.Sprintf("[Request %X@%v]", m.dataType, m.height) + switch m.type_ { + case requestTypeByte: + return fmt.Sprintf("[Request(fetch) %v]", m.key) + case requestTypeCanceled: + return fmt.Sprintf("[Request(canceled) %v]", m.key) + case requestTypeTryAgain: + return fmt.Sprintf("[Request(tryagain) %v]", m.key) + default: + return fmt.Sprintf("[Request(invalid) %v]", m.key) } } @@ -401,30 +637,24 @@ A dataMessage contains block data, maybe requested. The data can be a Validation, Txs, or whole Block object. */ type dataMessage struct { - dataType Byte - height UInt64 - bytes ByteSlice + key dataKey + bytes ByteSlice } func readDataMessage(r io.Reader) *dataMessage { - dataType := ReadByte(r) - height := ReadUInt64(r) - bytes := ReadByteSlice(r) return &dataMessage{ - dataType: dataType, - height: height, - bytes: bytes, + key: readDataKey(r), + bytes: readByteSlice(r), } } func (m *dataMessage) WriteTo(w io.Writer) (n int64, err error) { n, err = WriteTo(msgTypeData, w, n, err) - n, err = WriteTo(m.dataType, w, n, err) - n, err = WriteTo(m.height, w, n, err) + n, err = WriteTo(m.key, w, n, err) n, err = WriteTo(m.bytes, w, n, err) return } func (m *dataMessage) String() string { - return fmt.Sprintf("[Data %X@%v]", m.dataType, m.height) + return fmt.Sprintf("[Data %v]", m.key) } diff --git a/common/heap.go b/common/heap.go index 4505eb202..b75e32bcd 100644 --- a/common/heap.go +++ b/common/heap.go @@ -12,11 +12,11 @@ func NewHeap() *Heap { return &Heap{pq: make([]*pqItem, 0)} } -func (h *Heap) Len() int { +func (h *Heap) Len() int64 { return len(h.pq) } -func (h *Heap) Push(value interface{}, priority int) { +func (h *Heap) Push(value interface{}, priority int64) { heap.Push(&h.pq, &pqItem{value: value, priority: priority}) } @@ -44,7 +44,7 @@ func main() { type pqItem struct { value interface{} - priority int + priority int64 index int } @@ -78,7 +78,7 @@ func (pq *priorityQueue) Pop() interface{} { return item } -func (pq *priorityQueue) Update(item *pqItem, value interface{}, priority int) { +func (pq *priorityQueue) Update(item *pqItem, value interface{}, priority int64) { heap.Remove(pq, item.index) item.value = value item.priority = priority diff --git a/db/level_db.go b/db/level_db.go index 0366f48ee..db0c4d069 100644 --- a/db/level_db.go +++ b/db/level_db.go @@ -28,6 +28,13 @@ func (db *LevelDB) Set(key []byte, value []byte) { } func (db *LevelDB) Get(key []byte) []byte { + + batch := new(leveldb.Batch) + batch.Put([]byte("foo"), []byte("value")) + batch.Put([]byte("bar"), []byte("another value")) + batch.Delete([]byte("baz")) + err = db.Write(batch, nil) + res, err := db.db.Get(key, nil) if err != nil { panic(err) @@ -35,6 +42,9 @@ func (db *LevelDB) Get(key []byte) []byte { return res } +func (db *LevelDB) GetRange(key []byte, start, end int) []byte { +} + func (db *LevelDB) Delete(key []byte) { err := db.db.Delete(key, nil) if err != nil { diff --git a/db/mem_db.go b/db/mem_db.go index 67f9f061f..8cb0f4f3d 100644 --- a/db/mem_db.go +++ b/db/mem_db.go @@ -21,6 +21,9 @@ func (db *MemDB) Get(key []byte) []byte { return db.db[string(key)] } +func (db *MemDB) GetRange(key []byte, start, end int) []byte { +} + func (db *MemDB) Delete(key []byte) { delete(db.db, string(key)) } diff --git a/p2p/connection.go b/p2p/connection.go index 73c7dfc69..8d45efd1a 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -41,7 +41,7 @@ type MConnection struct { sendRate int64 recvRate int64 flushTimer *ThrottleTimer // flush writes as necessary but throttled. - canSend chan struct{} + send chan struct{} quit chan struct{} pingTimer *RepeatTimer // send pings periodically pong chan struct{} @@ -69,7 +69,7 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onError func(in sendRate: defaultSendRate, recvRate: defaultRecvRate, flushTimer: NewThrottleTimer(flushThrottleMS * time.Millisecond), - canSend: make(chan struct{}, 1), + send: make(chan struct{}, 1), quit: make(chan struct{}), pingTimer: NewRepeatTimer(pingTimeoutMinutes * time.Minute), pong: make(chan struct{}), @@ -150,6 +150,10 @@ func (c *MConnection) stopForError(r interface{}) { // Queues a message to be sent to channel. func (c *MConnection) Send(chId byte, bytes ByteSlice) bool { + if atomic.LoadUint32(&c.stopped) == 1 { + return false + } + // Send message to channel. channel, ok := c.channelsIdx[chId] if !ok { @@ -161,7 +165,7 @@ func (c *MConnection) Send(chId byte, bytes ByteSlice) bool { // Wake up sendHandler if necessary select { - case c.canSend <- struct{}{}: + case c.send <- struct{}{}: default: } @@ -171,6 +175,10 @@ func (c *MConnection) Send(chId byte, bytes ByteSlice) bool { // Queues a message to be sent to channel. // Nonblocking, returns true if successful. func (c *MConnection) TrySend(chId byte, bytes ByteSlice) bool { + if atomic.LoadUint32(&c.stopped) == 1 { + return false + } + // Send message to channel. channel, ok := c.channelsIdx[chId] if !ok { @@ -182,7 +190,7 @@ func (c *MConnection) TrySend(chId byte, bytes ByteSlice) bool { if ok { // Wake up sendHandler if necessary select { - case c.canSend <- struct{}{}: + case c.send <- struct{}{}: default: } } @@ -190,6 +198,19 @@ func (c *MConnection) TrySend(chId byte, bytes ByteSlice) bool { return ok } +func (c *MConnection) CanSend(chId byte) bool { + if atomic.LoadUint32(&c.stopped) == 1 { + return false + } + + channel, ok := c.channelsIdx[chId] + if !ok { + log.Error("Unknown channel %X", chId) + return 0 + } + return channel.canSend() +} + // sendHandler polls for packets to send from channels. func (c *MConnection) sendHandler() { defer c._recover() @@ -218,13 +239,13 @@ FOR_LOOP: c.flush() case <-c.quit: break FOR_LOOP - case <-c.canSend: + case <-c.send: // Send some packets eof := c.sendSomePackets() if !eof { // Keep sendHandler awake. select { - case c.canSend <- struct{}{}: + case c.send <- struct{}{}: default: } } @@ -384,15 +405,16 @@ type ChannelDescriptor struct { // TODO: lowercase. // NOTE: not goroutine-safe. type Channel struct { - conn *MConnection - desc *ChannelDescriptor - id byte - recvQueue chan InboundBytes - sendQueue chan ByteSlice - recving ByteSlice - sending ByteSlice - priority uint - recentlySent int64 // exponential moving average + conn *MConnection + desc *ChannelDescriptor + id byte + recvQueue chan InboundBytes + sendQueue chan ByteSlice + sendQueueSize uint32 + recving ByteSlice + sending ByteSlice + priority uint + recentlySent int64 // exponential moving average } func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { @@ -411,22 +433,39 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { } // Queues message to send to this channel. +// Goroutine-safe func (ch *Channel) sendBytes(bytes ByteSlice) { ch.sendQueue <- bytes + atomic.AddUint32(&ch.sendQueueSize, 1) } // Queues message to send to this channel. // Nonblocking, returns true if successful. +// Goroutine-safe func (ch *Channel) trySendBytes(bytes ByteSlice) bool { select { case ch.sendQueue <- bytes: + atomic.AddUint32(&ch.sendQueueSize, 1) return true default: return false } } +// Goroutine-safe +func (ch *Channel) sendQueueSize() (size int) { + return int(atomic.LoadUint32(&ch.sendQueueSize)) +} + +// Goroutine-safe +// Use only as a heuristic. +func (ch *Channel) canSend() bool { + return ch.sendQueueSize() < ch.desc.SendQueueCapacity +} + // Returns true if any packets are pending to be sent. +// Call before calling nextPacket() +// Goroutine-safe func (ch *Channel) sendPending() bool { if len(ch.sending) == 0 { if len(ch.sendQueue) == 0 { @@ -438,6 +477,7 @@ func (ch *Channel) sendPending() bool { } // Creates a new packet to send. +// Not goroutine-safe func (ch *Channel) nextPacket() packet { packet := packet{} packet.ChannelId = Byte(ch.id) @@ -445,6 +485,7 @@ func (ch *Channel) nextPacket() packet { if len(ch.sending) <= maxPacketSize { packet.EOF = Byte(0x01) ch.sending = nil + atomic.AddUint32(&ch.sendQueueSize, ^uint32(0)) // decrement sendQueueSize } else { packet.EOF = Byte(0x00) ch.sending = ch.sending[MinInt(maxPacketSize, len(ch.sending)):] @@ -453,6 +494,7 @@ func (ch *Channel) nextPacket() packet { } // Writes next packet to w. +// Not goroutine-safe func (ch *Channel) writePacketTo(w io.Writer) (n int64, err error) { packet := ch.nextPacket() n, err = WriteTo(packetTypeMessage, w, n, err) @@ -464,6 +506,7 @@ func (ch *Channel) writePacketTo(w io.Writer) (n int64, err error) { } // Handles incoming packets. +// Not goroutine-safe func (ch *Channel) recvPacket(pkt packet) { ch.recving = append(ch.recving, pkt.Bytes...) if pkt.EOF == Byte(0x01) { @@ -473,6 +516,7 @@ func (ch *Channel) recvPacket(pkt packet) { } // Call this periodically to update stats for throttling purposes. +// Not goroutine-safe func (ch *Channel) updateStats() { // Exponential decay of stats. // TODO: optimize. diff --git a/p2p/peer.go b/p2p/peer.go index 99b0bb26d..c44fcaded 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -55,6 +55,13 @@ func (p *Peer) IsOutbound() bool { return p.outbound } +func (p *Peer) Send(chId byte, bytes ByteSlice) bool { + if atomic.LoadUint32(&p.stopped) == 1 { + return false + } + return p.mconn.Send(chId, bytes) +} + func (p *Peer) TrySend(chId byte, bytes ByteSlice) bool { if atomic.LoadUint32(&p.stopped) == 1 { return false @@ -62,15 +69,15 @@ func (p *Peer) TrySend(chId byte, bytes ByteSlice) bool { return p.mconn.TrySend(chId, bytes) } -func (p *Peer) Send(chId byte, bytes ByteSlice) bool { +func (o *Peer) CanSend(chId byte) int { if atomic.LoadUint32(&p.stopped) == 1 { - return false + return 0 } - return p.mconn.Send(chId, bytes) + return p.mconn.CanSend(chId) } func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { - return p.mconn.RemoteAddress.WriteTo(w) + return String(p.Key).WriteTo(w) } func (p *Peer) String() string { @@ -82,5 +89,5 @@ func (p *Peer) String() string { } func (p *Peer) Equals(other *Peer) bool { - return p.mconn.RemoteAddress.Equals(other.mconn.RemoteAddress) + return p.Key == other.Key } diff --git a/sim/bench.go b/sim/bench.go new file mode 100644 index 000000000..7188d1a04 --- /dev/null +++ b/sim/bench.go @@ -0,0 +1,411 @@ +package main + +import ( + "container/heap" + "fmt" + "math/rand" +) + +const seed = 0 +const numNodes = 50000 // Total number of nodes to simulate +const minNumPeers = 10 // Each node should be connected to at least this many peers +const maxNumPeers = 15 // ... and at most this many +const latencyMS = int32(500) // One way packet latency +const partTxMS = int32(10) // Transmission time per peer of 4KB of data. +const sendQueueCapacity = 100 // Amount of messages to queue between peers. + +func init() { + rand.Seed(seed) +} + +//----------------------------------------------------------------------------- + +type Peer struct { + node *Node // Pointer to node + sent int32 // Time of last packet send, including transmit time. + remote int // SomeNode.peers[x].node.peers[remote].node is SomeNode for all x. + parts []byte // [32]byte{} bitarray of received block pieces. +} + +// Send a data event to the peer, or return false if queue is "full". +// Depending on how many event packets are "queued" for peer, +// the actual recvTime may be adjusted to be later. +func (p *Peer) sendEventData(event EventData) bool { + desiredRecvTime := event.RecvTime() + minRecvTime := p.sent + partTxMS + latencyMS + if desiredRecvTime >= minRecvTime { + p.node.sendEvent(event) + p.sent += partTxMS + return true + } else { + if (minRecvTime-desiredRecvTime)/partTxMS > sendQueueCapacity { + return false + } else { + event.SetRecvTime(minRecvTime) // Adjust recvTime + p.node.sendEvent(event) + p.sent += partTxMS + return true + } + } +} + +// Since EventPart events are much smaller, we don't consider the transmit time, +// and assume that the sendQueue is always free. +func (p *Peer) sendEventParts(event EventParts) { + p.node.sendEvent(event) +} + +// Does the peer's .parts (as received by an EventParts event) contain part? +func (p *Peer) knownToHave(part uint8) bool { + return p.parts[part/8]&(1<<(part%8)) > 0 +} + +//----------------------------------------------------------------------------- + +type Node struct { + index int + peers []*Peer + parts []byte + events *Heap +} + +func (n *Node) sendEvent(event Event) { + n.events.Push(event, event.RecvTime()) +} + +func (n *Node) recvEvent() Event { + return n.events.Pop().(Event) +} + +func (n *Node) receive(part uint8) bool { + x := n.parts[part/8] + nx := x | (1 << (part % 8)) + if x == nx { + return false + } else { + n.parts[part/8] = nx + return true + } +} + +// returns false if already connected, or remote node has too many connections. +func (n *Node) canConnectTo(node *Node) bool { + if len(n.peers) > maxNumPeers { + return false + } + for _, peer := range n.peers { + if peer.node == node { + return false + } + } + return true +} + +func (n *Node) isFull() bool { + for _, part := range n.parts { + if part != byte(0xff) { + return false + } + } + return true +} + +func (n *Node) String() string { + return fmt.Sprintf("{N:%d}", n.index) +} + +//----------------------------------------------------------------------------- + +type Event interface { + RecvTime() int32 + SetRecvTime(int32) +} + +type EventData struct { + time int32 // time of receipt. + part uint8 +} + +func (e EventData) RecvTime() int32 { + return e.time +} + +func (e EventData) SetRecvTime(time int32) { + e.time = time +} + +func (e EventData) String() string { + return fmt.Sprintf("[%d:%d]", e.time, e.part) +} + +type EventParts struct { + time int32 // time of receipt. + src int // src node's peer index on destination node. + parts []byte +} + +func (e EventParts) RecvTime() int32 { + return e.time +} + +func (e EventParts) SetRecvTime(time int32) { + e.time = time +} + +func (e EventParts) String() string { + return fmt.Sprintf("[%d:%d:%d]", e.time, e.src, e.parts) +} + +//----------------------------------------------------------------------------- + +func createNetwork() []*Node { + nodes := make([]*Node, numNodes) + for i := 0; i < numNodes; i++ { + n := &Node{ + index: i, + peers: []*Peer{}, + parts: make([]byte, 32), + events: NewHeap(), + } + nodes[i] = n + } + for i := 0; i < numNodes; i++ { + n := nodes[i] + for j := 0; j < minNumPeers; j++ { + if len(n.peers) > j { + // Already set, continue + continue + } + pidx := rand.Intn(numNodes) + for !n.canConnectTo(nodes[pidx]) { + pidx = rand.Intn(numNodes) + } + // connect to nodes[pidx] + remote := nodes[pidx] + remote_j := len(remote.peers) + n.peers = append(n.peers, &Peer{node: remote, remote: remote_j, parts: make([]byte, 32)}) + remote.peers = append(remote.peers, &Peer{node: n, remote: j, parts: make([]byte, 32)}) + } + } + return nodes +} + +func printNodes(nodes []*Node) { + for _, node := range nodes { + peerStr := "" + for _, peer := range node.peers { + peerStr += fmt.Sprintf(" %v", peer.node.index) + } + fmt.Printf("[%v] peers: %v\n", node.index, peerStr) + } +} + +func countFull(nodes []*Node) (fullCount int) { + for _, node := range nodes { + if node.isFull() { + fullCount += 1 + } + } + return fullCount +} + +func main() { + + // Global vars + nodes := createNetwork() + timeMS := int32(0) + proposer := nodes[0] + for i := 0; i < 32; i++ { + proposer.parts[i] = byte(0xff) + } + //printNodes(nodes[:]) + + // The proposer sends parts to all of its peers. + for i := 0; i < len(proposer.peers); i++ { + timeMS := int32(0) // scoped + peer := proposer.peers[i] + for j := 0; j < 256; j++ { + // Send each part to a peer, but each peer starts at a different offset. + part := uint8((j + i*25) % 256) + recvTime := timeMS + latencyMS + partTxMS + event := EventData{ + time: recvTime, + part: part, + } + peer.sendEventData(event) + timeMS += partTxMS + } + } + + // Run simulation + for { + // Lets run the simulation for each user until endTimeMS + // We use latencyMS/2 since causality has at least this much lag. + endTimeMS := timeMS + latencyMS/2 + fmt.Printf("simulating until %v\n", endTimeMS) + + // Print out the network for debugging + if true { + for i := 40000; i < 40050; i++ { + node := nodes[i] + fmt.Printf("[%v] parts: %X\n", node.index, node.parts) + } + } + + for _, node := range nodes { + + // Iterate over the events of this node until event.time >= endTimeMS + for { + _event, ok := node.events.Peek().(Event) + if !ok || _event.RecvTime() >= endTimeMS { + break + } else { + node.events.Pop() + } + + switch _event.(type) { + case EventData: + event := _event.(EventData) + + // Process this event + if !node.receive(event.part) { + // Already has this part, ignore this event. + continue + } + + // Let's iterate over peers & see which needs this piece. + recvTime := event.time + latencyMS + partTxMS + for _, peer := range node.peers { + if peer.knownToHave(event.part) { + continue + } + peer.sendEventData(EventData{ + time: recvTime, + part: event.part, + }) + } + + case EventParts: + event := _event.(EventParts) + node.peers[event.src].parts = event.parts + } + + } + } + + // If network is full, quit. + if countFull(nodes) == numNodes { + fmt.Printf("Done! took %v ms", timeMS) + break + } + + // Lets increment the timeMS now + timeMS += latencyMS / 2 + + // Send EventParts rather frequently. It's cheap. + for _, node := range nodes { + for _, peer := range node.peers { + peer.sendEventParts(EventParts{ + time: timeMS + latencyMS, + src: peer.remote, + parts: node.parts, + }) + } + + newParts := make([]byte, 32) + copy(newParts, node.parts) + node.parts = newParts + } + + } +} + +// ---------------------------------------------------------------------------- + +type Heap struct { + pq priorityQueue +} + +func NewHeap() *Heap { + return &Heap{pq: make([]*pqItem, 0)} +} + +func (h *Heap) Len() int { + return len(h.pq) +} + +func (h *Heap) Peek() interface{} { + if len(h.pq) == 0 { + return nil + } + return h.pq[0].value +} + +func (h *Heap) Push(value interface{}, priority int32) { + heap.Push(&h.pq, &pqItem{value: value, priority: priority}) +} + +func (h *Heap) Pop() interface{} { + item := heap.Pop(&h.pq).(*pqItem) + return item.value +} + +/* +func main() { + h := NewHeap() + + h.Push(String("msg1"), 1) + h.Push(String("msg3"), 3) + h.Push(String("msg2"), 2) + + fmt.Println(h.Pop()) + fmt.Println(h.Pop()) + fmt.Println(h.Pop()) +} +*/ + +/////////////////////// +// From: http://golang.org/pkg/container/heap/#example__priorityQueue + +type pqItem struct { + value interface{} + priority int32 + index int +} + +type priorityQueue []*pqItem + +func (pq priorityQueue) Len() int { return len(pq) } + +func (pq priorityQueue) Less(i, j int) bool { + return pq[i].priority < pq[j].priority +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *priorityQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*pqItem) + item.index = n + *pq = append(*pq, item) +} + +func (pq *priorityQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +func (pq *priorityQueue) Update(item *pqItem, value interface{}, priority int32) { + heap.Remove(pq, item.index) + item.value = value + item.priority = priority + heap.Push(pq, item) +}