diff --git a/blockchain/pool.go b/blockchain/pool.go index 9f2d042dd..4ddba39f2 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -1,24 +1,22 @@ package blockchain import ( + "math" "sync" "time" + flow "github.com/tendermint/tendermint/Godeps/_workspace/src/code.google.com/p/mxk/go1/flowcontrol" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/types" ) const ( - maxTries = 3 - inputsChannelCapacity = 200 - requestIntervalMS = 500 - maxPendingRequests = 200 - maxTotalRequests = 300 - maxRequestsPerPeer = 300 -) - -var ( - requestTimeoutSeconds = time.Duration(3) + requestIntervalMS = 500 + maxTotalRequests = 300 + maxPendingRequests = maxTotalRequests + maxPendingRequestsPerPeer = 30 + peerTimeoutSeconds = 10 + minRecvRate = 10240 // 10Kb/s ) /* @@ -33,14 +31,13 @@ var ( */ type BlockPool struct { - BaseService + QuitService // block requests - requestsMtx sync.Mutex - requests map[int]*bpRequest - height int // the lowest key in requests. - numUnassigned int32 // number of requests not yet assigned to a peer - numPending int32 // number of requests pending assignment or block response + mtx sync.Mutex + requests map[int]*bpRequester + height int // the lowest key in requests. + numPending int32 // number of requests pending assignment or block response // peers peersMtx sync.Mutex @@ -48,46 +45,40 @@ type BlockPool struct { requestsCh chan<- BlockRequest timeoutsCh chan<- string - repeater *RepeatTimer } func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool { bp := &BlockPool{ peers: make(map[string]*bpPeer), - requests: make(map[int]*bpRequest), - height: start, - numUnassigned: 0, - numPending: 0, + requests: make(map[int]*bpRequester), + height: start, + numPending: 0, requestsCh: requestsCh, timeoutsCh: timeoutsCh, - repeater: nil, } - bp.BaseService = *NewBaseService(log, "BlockPool", bp) + bp.QuitService = *NewQuitService(log, "BlockPool", bp) return bp } func (pool *BlockPool) OnStart() error { - pool.BaseService.OnStart() - pool.repeater = NewRepeatTimer("", requestIntervalMS*time.Millisecond) - go pool.run() + pool.QuitService.OnStart() + go pool.makeRequestsRoutine() return nil } func (pool *BlockPool) OnStop() { - pool.BaseService.OnStop() - pool.repeater.Stop() + pool.QuitService.OnStop() } // Run spawns requests as needed. -func (pool *BlockPool) run() { -RUN_LOOP: +func (pool *BlockPool) makeRequestsRoutine() { for { if !pool.IsRunning() { - break RUN_LOOP + break } - _, numPending, _ := pool.GetStatus() + _, numPending := pool.GetStatus() if numPending >= maxPendingRequests { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) @@ -101,18 +92,35 @@ RUN_LOOP: } } -func (pool *BlockPool) GetStatus() (height int, numPending int32, numUnssigned int32) { - pool.requestsMtx.Lock() // Lock - defer pool.requestsMtx.Unlock() +func (pool *BlockPool) GetStatus() (height int, numPending int32) { + pool.mtx.Lock() // Lock + defer pool.mtx.Unlock() + + return pool.height, pool.numPending +} + +// TODO: relax conditions, prevent abuse. +func (pool *BlockPool) IsCaughtUp() bool { + pool.mtx.Lock() + height := pool.height + pool.mtx.Unlock() + + pool.peersMtx.Lock() + numPeers := len(pool.peers) + maxPeerHeight := 0 + for _, peer := range pool.peers { + maxPeerHeight = MaxInt(maxPeerHeight, peer.height) + } + pool.peersMtx.Unlock() - return pool.height, pool.numPending, pool.numUnassigned + return numPeers >= 3 && height > 0 && height == maxPeerHeight } // We need to see the second block's Validation to validate the first block. // So we peek two blocks at a time. func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { - pool.requestsMtx.Lock() // Lock - defer pool.requestsMtx.Unlock() + pool.mtx.Lock() // Lock + defer pool.mtx.Unlock() if r := pool.requests[pool.height]; r != nil { first = r.block @@ -126,8 +134,8 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) // Pop the first block at pool.height // It must have been validated by 'second'.Validation from PeekTwoBlocks(). func (pool *BlockPool) PopRequest() { - pool.requestsMtx.Lock() // Lock - defer pool.requestsMtx.Unlock() + pool.mtx.Lock() // Lock + defer pool.mtx.Unlock() if r := pool.requests[pool.height]; r == nil || r.block == nil { PanicSanity("PopRequest() requires a valid block") @@ -137,108 +145,77 @@ func (pool *BlockPool) PopRequest() { pool.height++ } -// Invalidates the block at pool.height. -// Remove the peer and request from others. +// Invalidates the block at pool.height, +// Remove the peer and redo request from others. func (pool *BlockPool) RedoRequest(height int) { - pool.requestsMtx.Lock() // Lock - defer pool.requestsMtx.Unlock() + pool.mtx.Lock() // Lock + defer pool.mtx.Unlock() request := pool.requests[height] if request.block == nil { PanicSanity("Expected block to be non-nil") } + // RemovePeer will redo all requests associated with this peer. // TODO: record this malfeasance - // maybe punish peer on switch (an invalid block!) - pool.RemovePeer(request.peerId) // Lock on peersMtx. - request.block = nil - request.peerId = "" - pool.numPending++ - pool.numUnassigned++ - - go requestRoutine(pool, height) + pool.RemovePeer(request.peerID) // Lock on peersMtx. } -func (pool *BlockPool) hasBlock(height int) bool { - pool.requestsMtx.Lock() // Lock - defer pool.requestsMtx.Unlock() - - request := pool.requests[height] - return request != nil && request.block != nil -} +// TODO: ensure that blocks come in order for each peer. +func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) { + pool.mtx.Lock() // Lock + defer pool.mtx.Unlock() -func (pool *BlockPool) setPeerForRequest(height int, peerId string) { - pool.requestsMtx.Lock() // Lock - defer pool.requestsMtx.Unlock() - - request := pool.requests[height] + request := pool.requests[block.Height] if request == nil { return } - pool.numUnassigned-- - request.peerId = peerId -} -func (pool *BlockPool) removePeerForRequest(height int, peerId string) { - pool.requestsMtx.Lock() // Lock - defer pool.requestsMtx.Unlock() - - request := pool.requests[height] - if request == nil { - return + if request.setBlock(block, peerID) { + pool.numPending-- + peer := pool.getPeer(peerID) + peer.decrPending(blockSize) + } else { + // Bad peer? } - pool.numUnassigned++ - request.peerId = "" } -func (pool *BlockPool) AddBlock(block *types.Block, peerId string) { - pool.requestsMtx.Lock() // Lock - defer pool.requestsMtx.Unlock() +// Sets the peer's alleged blockchain height. +func (pool *BlockPool) SetPeerHeight(peerID string, height int) { + pool.peersMtx.Lock() // Lock + defer pool.peersMtx.Unlock() - request := pool.requests[block.Height] - if request == nil { - return - } - if request.peerId != peerId { - return - } - if request.block != nil { - return + peer := pool.peers[peerID] + if peer != nil { + peer.height = height + } else { + peer = newBPPeer(pool, peerID, height) + pool.peers[peerID] = peer } - request.block = block - pool.numPending-- } -func (pool *BlockPool) getPeer(peerId string) *bpPeer { +func (pool *BlockPool) RemovePeer(peerID string) { pool.peersMtx.Lock() // Lock defer pool.peersMtx.Unlock() - peer := pool.peers[peerId] - return peer + pool.removePeer(peerID) } -// Sets the peer's alleged blockchain height. -func (pool *BlockPool) SetPeerHeight(peerId string, height int) { - pool.peersMtx.Lock() // Lock - defer pool.peersMtx.Unlock() - - peer := pool.peers[peerId] - if peer != nil { - peer.height = height - } else { - peer = &bpPeer{ - height: height, - id: peerId, - numRequests: 0, +func (pool *BlockPool) removePeer(peerID string) { + for _, request := range pool.requests { + if request.getPeerID() == peerID { + pool.numPending++ + request.redo() // pick another peer and ... } - pool.peers[peerId] = peer } + delete(pool.peers, peerID) } -func (pool *BlockPool) RemovePeer(peerId string) { +func (pool *BlockPool) getPeer(peerID string) *bpPeer { pool.peersMtx.Lock() // Lock defer pool.peersMtx.Unlock() - delete(pool.peers, peerId) + peer := pool.peers[peerID] + return peer } // Pick an available peer with at least the given minHeight. @@ -248,64 +225,52 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer { defer pool.peersMtx.Unlock() for _, peer := range pool.peers { - if peer.numRequests >= maxRequestsPerPeer { + if peer.isBad() { + pool.removePeer(peer.id) + continue + } + if peer.numPending >= maxPendingRequestsPerPeer { continue } if peer.height < minHeight { continue } - peer.numRequests++ + peer.incrPending() return peer } return nil } -func (pool *BlockPool) decrPeer(peerId string) { - pool.peersMtx.Lock() - defer pool.peersMtx.Unlock() - - peer := pool.peers[peerId] - if peer == nil { - return - } - peer.numRequests-- -} - func (pool *BlockPool) makeNextRequest() { - pool.requestsMtx.Lock() // Lock - defer pool.requestsMtx.Unlock() + pool.mtx.Lock() // Lock + defer pool.mtx.Unlock() nextHeight := pool.height + len(pool.requests) - request := &bpRequest{ - height: nextHeight, - peerId: "", - block: nil, - } + request := newBPRequester(pool, nextHeight) pool.requests[nextHeight] = request - pool.numUnassigned++ pool.numPending++ - go requestRoutine(pool, nextHeight) + request.Start() } -func (pool *BlockPool) sendRequest(height int, peerId string) { +func (pool *BlockPool) sendRequest(height int, peerID string) { if !pool.IsRunning() { return } - pool.requestsCh <- BlockRequest{height, peerId} + pool.requestsCh <- BlockRequest{height, peerID} } -func (pool *BlockPool) sendTimeout(peerId string) { +func (pool *BlockPool) sendTimeout(peerID string) { if !pool.IsRunning() { return } - pool.timeoutsCh <- peerId + pool.timeoutsCh <- peerID } func (pool *BlockPool) debug() string { - pool.requestsMtx.Lock() // Lock - defer pool.requestsMtx.Unlock() + pool.mtx.Lock() // Lock + defer pool.mtx.Unlock() str := "" for h := pool.height; h < pool.height+len(pool.requests); h++ { @@ -322,64 +287,189 @@ func (pool *BlockPool) debug() string { //------------------------------------- type bpPeer struct { + pool *BlockPool id string height int - numRequests int32 + numPending int32 + recvMonitor *flow.Monitor + timeout *time.Timer + didTimeout bool } -type bpRequest struct { - height int - peerId string - block *types.Block +func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer { + peer := &bpPeer{ + pool: pool, + id: peerID, + height: height, + numPending: 0, + } + return peer +} + +func (bpp *bpPeer) resetMonitor() { + bpp.recvMonitor = flow.New(time.Second, time.Second*40) + var initialValue = float64(minRecvRate) * math.E + bpp.recvMonitor.Update(int(initialValue)) +} + +func (bpp *bpPeer) resetTimeout() { + if bpp.timeout == nil { + bpp.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, bpp.onTimeout) + } else { + bpp.timeout.Reset(time.Second * peerTimeoutSeconds) + } +} + +func (bpp *bpPeer) incrPending() { + if bpp.numPending == 0 { + bpp.resetMonitor() + bpp.resetTimeout() + } + bpp.numPending++ +} + +func (bpp *bpPeer) decrPending(recvSize int) { + bpp.numPending-- + if bpp.numPending == 0 { + bpp.timeout.Stop() + } else { + bpp.recvMonitor.Update(recvSize) + bpp.resetTimeout() + } +} + +func (bpp *bpPeer) onTimeout() { + bpp.didTimeout = true +} + +func (bpp *bpPeer) isBad() bool { + if bpp.didTimeout { + bpp.pool.sendTimeout(bpp.id) + return true + } + if bpp.numPending == 0 { + return false + } else { + bpp.pool.sendTimeout(bpp.id) + return bpp.recvMonitor.Status().CurRate < minRecvRate + } } //------------------------------------- +type bpRequester struct { + QuitService + pool *BlockPool + height int + gotBlockCh chan struct{} + redoCh chan struct{} + + mtx sync.Mutex + peerID string + block *types.Block +} + +func newBPRequester(pool *BlockPool, height int) *bpRequester { + bpr := &bpRequester{ + pool: pool, + height: height, + gotBlockCh: make(chan struct{}), + redoCh: make(chan struct{}), + + peerID: "", + block: nil, + } + bpr.QuitService = *NewQuitService(nil, "bpRequester", bpr) + return bpr +} + +func (bpr *bpRequester) OnStart() error { + bpr.QuitService.OnStart() + go bpr.requestRoutine() + return nil +} + +// Returns true if the peer matches +func (bpr *bpRequester) setBlock(block *types.Block, peerID string) bool { + bpr.mtx.Lock() + if bpr.block != nil || bpr.peerID != peerID { + bpr.mtx.Unlock() + return false + } + bpr.block = block + bpr.mtx.Unlock() + + bpr.gotBlockCh <- struct{}{} + return true +} + +func (bpr *bpRequester) getPeerID() string { + bpr.mtx.Lock() + defer bpr.mtx.Unlock() + return bpr.peerID +} + +func (bpr *bpRequester) reset() { + bpr.mtx.Lock() + bpr.peerID = "" + bpr.block = nil + bpr.mtx.Unlock() +} + +// Tells bpRequester to pick another peer and try again. +func (bpr *bpRequester) redo() { + bpr.redoCh <- struct{}{} +} + // Responsible for making more requests as necessary // Returns only when a block is found (e.g. AddBlock() is called) -func requestRoutine(pool *BlockPool, height int) { +func (bpr *bpRequester) requestRoutine() { +OUTER_LOOP: for { + + // Pick a peer to send request to. var peer *bpPeer = nil - PICK_LOOP: + PICK_PEER_LOOP: for { - if !pool.IsRunning() { - log.Info("BlockPool not running. Stopping requestRoutine", "height", height) + if !bpr.IsRunning() || !bpr.pool.IsRunning() { return } - peer = pool.pickIncrAvailablePeer(height) + peer = bpr.pool.pickIncrAvailablePeer(bpr.height) if peer == nil { //log.Info("No peers available", "height", height) time.Sleep(requestIntervalMS * time.Millisecond) - continue PICK_LOOP + continue PICK_PEER_LOOP } - break PICK_LOOP + break PICK_PEER_LOOP } - - // set the peer, decrement numUnassigned - pool.setPeerForRequest(height, peer.id) - - for try := 0; try < maxTries; try++ { - pool.sendRequest(height, peer.id) - time.Sleep(requestTimeoutSeconds * time.Second) - // if successful the block is either in the pool, - if pool.hasBlock(height) { - pool.decrPeer(peer.id) + bpr.mtx.Lock() + bpr.peerID = peer.id + bpr.mtx.Unlock() + + // Send request and wait. + bpr.pool.sendRequest(bpr.height, peer.id) + select { + case <-bpr.pool.Quit: + bpr.Stop() + return + case <-bpr.Quit: + return + case <-bpr.redoCh: + bpr.reset() + continue OUTER_LOOP // When peer is removed + case <-bpr.gotBlockCh: + // We got the block, now see if it's good. + select { + case <-bpr.pool.Quit: + bpr.Stop() return - } - // or already processed and we've moved past it - bpHeight, _, _ := pool.GetStatus() - if height < bpHeight { - pool.decrPeer(peer.id) + case <-bpr.Quit: return + case <-bpr.redoCh: + bpr.reset() + continue OUTER_LOOP } } - - // unset the peer, increment numUnassigned - pool.removePeerForRequest(height, peer.id) - - // this peer failed us, try again - pool.RemovePeer(peer.id) - pool.sendTimeout(peer.id) } } @@ -387,5 +477,5 @@ func requestRoutine(pool *BlockPool, height int) { type BlockRequest struct { Height int - PeerId string + PeerID string } diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index ec587cac2..a5b30e4db 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -17,9 +17,9 @@ type testPeer struct { func makePeers(numPeers int, minHeight, maxHeight int) map[string]testPeer { peers := make(map[string]testPeer, numPeers) for i := 0; i < numPeers; i++ { - peerId := RandStr(12) + peerID := RandStr(12) height := minHeight + rand.Intn(maxHeight-minHeight) - peers[peerId] = testPeer{peerId, height} + peers[peerID] = testPeer{peerID, height} } return peers } @@ -57,8 +57,8 @@ func TestBasic(t *testing.T) { // Pull from channels for { select { - case peerId := <-timeoutsCh: - t.Errorf("timeout: %v", peerId) + case peerID := <-timeoutsCh: + t.Errorf("timeout: %v", peerID) case request := <-requestsCh: log.Info("TEST: Pulled new BlockRequest", "request", request) if request.Height == 300 { @@ -67,8 +67,8 @@ func TestBasic(t *testing.T) { // Request desired, pretend like we got the block immediately. go func() { block := &types.Block{Header: &types.Header{Height: request.Height}} - pool.AddBlock(block, request.PeerId) - log.Info("TEST: Added block", "block", request.Height, "peer", request.PeerId) + pool.AddBlock(request.PeerID, block, 123) + log.Info("TEST: Added block", "block", request.Height, "peer", request.PeerID) }() } } @@ -111,9 +111,9 @@ func TestTimeout(t *testing.T) { timedOut := map[string]struct{}{} for { select { - case peerId := <-timeoutsCh: - log.Info("Timeout", "peerId", peerId) - if _, ok := timedOut[peerId]; !ok { + case peerID := <-timeoutsCh: + log.Info("Timeout", "peerID", peerID) + if _, ok := timedOut[peerID]; !ok { counter++ if counter == len(peers) { return // Done! diff --git a/blockchain/reactor.go b/blockchain/reactor.go index fc1a494cb..26d4ebabc 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -27,7 +27,7 @@ const ( // ask for best height every 10s statusUpdateIntervalSeconds = 10 // check if we should switch to consensus reactor - switchToConsensusIntervalSeconds = 10 + switchToConsensusIntervalSeconds = 1 ) type consensusReactor interface { @@ -97,7 +97,7 @@ func (bcR *BlockchainReactor) OnStop() { func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ - Id: BlockchainChannel, + ID: BlockchainChannel, Priority: 5, SendQueueCapacity: 100, }, @@ -117,7 +117,7 @@ func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { } // Implements Reactor -func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) { +func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) if err != nil { log.Warn("Error decoding message", "error", err) @@ -141,7 +141,7 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) } case *bcBlockResponseMessage: // Got a block. - bcR.pool.AddBlock(msg.Block, src.Key) + bcR.pool.AddBlock(src.Key, msg.Block, len(msgBytes)) case *bcStatusRequestMessage: // Send peer our state. queued := src.TrySend(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()}) @@ -169,21 +169,20 @@ FOR_LOOP: for { select { case request := <-bcR.requestsCh: // chan BlockRequest - peer := bcR.Switch.Peers().Get(request.PeerId) + peer := bcR.Switch.Peers().Get(request.PeerID) if peer == nil { - // We can't assign the request. - continue FOR_LOOP + continue FOR_LOOP // Peer has since been disconnected. } msg := &bcBlockRequestMessage{request.Height} queued := peer.TrySend(BlockchainChannel, msg) if !queued { // We couldn't make the request, send-queue full. - // The pool handles retries, so just let it go. + // The pool handles timeouts, just let it go. continue FOR_LOOP } - case peerId := <-bcR.timeoutsCh: // chan string + case peerID := <-bcR.timeoutsCh: // chan string // Peer timed out. - peer := bcR.Switch.Peers().Get(peerId) + peer := bcR.Switch.Peers().Get(peerID) if peer != nil { bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) } @@ -191,17 +190,11 @@ FOR_LOOP: // ask for status updates go bcR.BroadcastStatusRequest() case _ = <-switchToConsensusTicker.C: - height, numPending, numUnassigned := bcR.pool.GetStatus() + height, numPending := bcR.pool.GetStatus() outbound, inbound, _ := bcR.Switch.NumPeers() - log.Info("Consensus ticker", "numUnassigned", numUnassigned, "numPending", numPending, - "total", len(bcR.pool.requests), "outbound", outbound, "inbound", inbound) - // NOTE: this condition is very strict right now. may need to weaken - // If all `maxPendingRequests` requests are unassigned - // and we have some peers (say >= 3), then we're caught up - maxPending := numPending == maxPendingRequests - allUnassigned := numPending == numUnassigned - enoughPeers := outbound+inbound >= 3 - if maxPending && allUnassigned && enoughPeers { + log.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requests), + "outbound", outbound, "inbound", inbound) + if bcR.pool.IsCaughtUp() { log.Notice("Time to switch to consensus reactor!", "height", height) bcR.pool.Stop() @@ -283,11 +276,15 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest}, ) +// TODO: ensure that bz is completely read. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) { msgType = bz[0] - n := new(int64) + n := int64(0) r := bytes.NewReader(bz) - msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, n, &err).(struct{ BlockchainMessage }).BlockchainMessage + msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage + if err != nil && n != int64(len(bz)) { + err = errors.New("DecodeMessage() had bytes left over.") + } return } diff --git a/common/service.go b/common/service.go index a992cf2d7..05b2adedd 100644 --- a/common/service.go +++ b/common/service.go @@ -75,15 +75,21 @@ func NewBaseService(log log15.Logger, name string, impl Service) *BaseService { func (bs *BaseService) Start() (bool, error) { if atomic.CompareAndSwapUint32(&bs.started, 0, 1) { if atomic.LoadUint32(&bs.stopped) == 1 { - bs.log.Warn(Fmt("Not starting %v -- already stopped", bs.name), "impl", bs.impl) + if bs.log != nil { + bs.log.Warn(Fmt("Not starting %v -- already stopped", bs.name), "impl", bs.impl) + } return false, nil } else { - bs.log.Notice(Fmt("Starting %v", bs.name), "impl", bs.impl) + if bs.log != nil { + bs.log.Notice(Fmt("Starting %v", bs.name), "impl", bs.impl) + } } err := bs.impl.OnStart() return true, err } else { - bs.log.Info(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl) + if bs.log != nil { + bs.log.Info(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl) + } return false, nil } } @@ -94,11 +100,15 @@ func (bs *BaseService) OnStart() error { return nil } // Implements Service func (bs *BaseService) Stop() bool { if atomic.CompareAndSwapUint32(&bs.stopped, 0, 1) { - bs.log.Notice(Fmt("Stopping %v", bs.name), "impl", bs.impl) + if bs.log != nil { + bs.log.Notice(Fmt("Stopping %v", bs.name), "impl", bs.impl) + } bs.impl.OnStop() return true } else { - bs.log.Notice(Fmt("Not stopping %v", bs.name), "impl", bs.impl) + if bs.log != nil { + bs.log.Notice(Fmt("Not stopping %v", bs.name), "impl", bs.impl) + } return false } } @@ -138,5 +148,7 @@ func (qs *QuitService) OnStart() error { // NOTE: when overriding OnStop, must call .QuitService.OnStop(). func (qs *QuitService) OnStop() { - close(qs.Quit) + if qs.Quit != nil { + close(qs.Quit) + } } diff --git a/config/tendermint/config.go b/config/tendermint/config.go index d624ce7a7..1ebb195e6 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -106,20 +106,28 @@ var defaultGenesis = `{ "accounts": [ { "address": "9FCBA7F840A0BFEBBE755E853C9947270A912D04", - "amount": 2098999998000000 + "amount": 1997999998000000 + }, + { + "address": "964B1493BBE3312278B7DEB94C39149F7899A345", + "amount": 100000000000000 + }, + { + "address": "B9FA4AB462B9C6BF6A62DB4AE77C9E7087209A04", + "amount": 1000000000000 + }, + { + "address": "F171824590D69386F709E7B6704B369C5A370D60", + "amount": 1000000000000 }, - { - "address": "B9FA4AB462B9C6BF6A62DB4AE77C9E7087209A04", - "amount": 1000000000000 - }, { "address": "A88A61069B6660F30F65E8786AFDD4F1D8F625E9", - "amount": 1000000 - }, + "amount": 1000000 + }, { "address": "EE2EE9247973B4AFC3867CFE5F415410AC251B61", - "amount": 1000000 - } + "amount": 1000000 + } ], "validators": [ { diff --git a/consensus/reactor.go b/consensus/reactor.go index be9cd01e0..e822cbfb2 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -82,17 +82,17 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { // TODO optimize return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ - Id: StateChannel, + ID: StateChannel, Priority: 5, SendQueueCapacity: 100, }, &p2p.ChannelDescriptor{ - Id: DataChannel, + ID: DataChannel, Priority: 5, SendQueueCapacity: 2, }, &p2p.ChannelDescriptor{ - Id: VoteChannel, + ID: VoteChannel, Priority: 5, SendQueueCapacity: 40, }, @@ -131,9 +131,9 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // Implements Reactor // NOTE: We process these messages even when we're fast_syncing. -func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) { +func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) { if !conR.IsRunning() { - log.Debug("Receive", "channel", chId, "peer", peer, "bytes", msgBytes) + log.Debug("Receive", "channel", chID, "peer", peer, "bytes", msgBytes) return } @@ -142,12 +142,12 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte ps := peer.Data.Get(PeerStateKey).(*PeerState) _, msg, err := DecodeMessage(msgBytes) if err != nil { - log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes) + log.Warn("Error decoding message", "channel", chID, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes) return } - log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg, "rsHeight", rs.Height) + log.Debug("Receive", "channel", chID, "peer", peer, "msg", msg, "rsHeight", rs.Height) - switch chId { + switch chID { case StateChannel: switch msg := msg.(type) { case *NewRoundStepMessage: @@ -232,7 +232,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) } default: - log.Warn(Fmt("Unknown channel %X", chId)) + log.Warn(Fmt("Unknown channel %X", chID)) } if err != nil { diff --git a/events/events.go b/events/events.go index 818b1f4f1..a4173d242 100644 --- a/events/events.go +++ b/events/events.go @@ -45,7 +45,7 @@ func (evsw *EventSwitch) OnStop() { evsw.listeners = nil } -func (evsw *EventSwitch) AddListenerForEvent(listenerId, event string, cb eventCallback) { +func (evsw *EventSwitch) AddListenerForEvent(listenerID, event string, cb eventCallback) { // Get/Create eventCell and listener evsw.mtx.Lock() eventCell := evsw.eventCells[event] @@ -53,23 +53,23 @@ func (evsw *EventSwitch) AddListenerForEvent(listenerId, event string, cb eventC eventCell = newEventCell() evsw.eventCells[event] = eventCell } - listener := evsw.listeners[listenerId] + listener := evsw.listeners[listenerID] if listener == nil { - listener = newEventListener(listenerId) - evsw.listeners[listenerId] = listener + listener = newEventListener(listenerID) + evsw.listeners[listenerID] = listener } evsw.mtx.Unlock() // Add event and listener - eventCell.AddListener(listenerId, cb) + eventCell.AddListener(listenerID, cb) listener.AddEvent(event) } -func (evsw *EventSwitch) RemoveListener(listenerId string) { +func (evsw *EventSwitch) RemoveListener(listenerID string) { // Get and remove listener evsw.mtx.RLock() - listener := evsw.listeners[listenerId] - delete(evsw.listeners, listenerId) + listener := evsw.listeners[listenerID] + delete(evsw.listeners, listenerID) evsw.mtx.RUnlock() if listener == nil { @@ -79,11 +79,11 @@ func (evsw *EventSwitch) RemoveListener(listenerId string) { // Remove callback for each event. listener.SetRemoved() for _, event := range listener.GetEvents() { - evsw.RemoveListenerForEvent(event, listenerId) + evsw.RemoveListenerForEvent(event, listenerID) } } -func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerId string) { +func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerID string) { // Get eventCell evsw.mtx.Lock() eventCell := evsw.eventCells[event] @@ -93,8 +93,8 @@ func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerId string) return } - // Remove listenerId from eventCell - numListeners := eventCell.RemoveListener(listenerId) + // Remove listenerID from eventCell + numListeners := eventCell.RemoveListener(listenerID) // Maybe garbage collect eventCell. if numListeners == 0 { @@ -137,15 +137,15 @@ func newEventCell() *eventCell { } } -func (cell *eventCell) AddListener(listenerId string, cb eventCallback) { +func (cell *eventCell) AddListener(listenerID string, cb eventCallback) { cell.mtx.Lock() - cell.listeners[listenerId] = cb + cell.listeners[listenerID] = cb cell.mtx.Unlock() } -func (cell *eventCell) RemoveListener(listenerId string) int { +func (cell *eventCell) RemoveListener(listenerID string) int { cell.mtx.Lock() - delete(cell.listeners, listenerId) + delete(cell.listeners, listenerID) numListeners := len(cell.listeners) cell.mtx.Unlock() return numListeners diff --git a/mempool/reactor.go b/mempool/reactor.go index 4f99c4494..664e33467 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -38,7 +38,7 @@ func NewMempoolReactor(mempool *Mempool) *MempoolReactor { func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ - Id: MempoolChannel, + ID: MempoolChannel, Priority: 5, }, } @@ -53,7 +53,7 @@ func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { } // Implements Reactor -func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) { +func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) if err != nil { log.Warn("Error decoding message", "error", err) diff --git a/p2p/README.md b/p2p/README.md index a7324b714..6149d9c0f 100644 --- a/p2p/README.md +++ b/p2p/README.md @@ -16,15 +16,15 @@ initialization of the connection. There are two methods for sending messages: ```go -func (m MConnection) Send(chId byte, msg interface{}) bool {} -func (m MConnection) TrySend(chId byte, msg interface{}) bool {} +func (m MConnection) Send(chID byte, msg interface{}) bool {} +func (m MConnection) TrySend(chID byte, msg interface{}) bool {} ``` -`Send(chId, msg)` is a blocking call that waits until `msg` is successfully queued -for the channel with the given id byte `chId`. The message `msg` is serialized +`Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued +for the channel with the given id byte `chID`. The message `msg` is serialized using the `tendermint/wire` submodule's `WriteBinary()` reflection routine. -`TrySend(chId, msg)` is a nonblocking call that returns false if the channel's +`TrySend(chID, msg)` is a nonblocking call that returns false if the channel's queue is full. `Send()` and `TrySend()` are also exposed for each `Peer`. @@ -37,14 +37,14 @@ or more `Channels`. So while sending outgoing messages is typically performed o incoming messages are received on the reactor. ```go -// Declare a MyReactor reactor that handles messages on MyChannelId. +// Declare a MyReactor reactor that handles messages on MyChannelID. type MyReactor struct{} func (reactor MyReactor) GetChannels() []*ChannelDescriptor { - return []*ChannelDescriptor{ChannelDescriptor{Id:MyChannelId, Priority: 1}} + return []*ChannelDescriptor{ChannelDescriptor{ID:MyChannelID, Priority: 1}} } -func (reactor MyReactor) Receive(chId byte, peer *Peer, msgBytes []byte) { +func (reactor MyReactor) Receive(chID byte, peer *Peer, msgBytes []byte) { r, n, err := bytes.NewBuffer(msgBytes), new(int64), new(error) msgString := ReadString(r, n, err) fmt.Println(msgString) @@ -60,7 +60,7 @@ switch := NewSwitch([]Reactor{MyReactor{}}) // Send a random message to all outbound connections for _, peer := range switch.Peers().List() { if peer.IsOutbound() { - peer.Send(MyChannelId, "Here's a random message") + peer.Send(MyChannelID, "Here's a random message") } } ``` diff --git a/p2p/connection.go b/p2p/connection.go index d1782c1b5..90fafe894 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -22,15 +22,15 @@ const ( idleTimeoutMinutes = 5 updateStatsSeconds = 2 pingTimeoutSeconds = 40 - defaultSendRate = 10240 // 10Kb/s - defaultRecvRate = 10240 // 10Kb/s + defaultSendRate = 51200 // 50Kb/s + defaultRecvRate = 51200 // 50Kb/s flushThrottleMS = 100 defaultSendQueueCapacity = 1 defaultRecvBufferCapacity = 4096 defaultSendTimeoutSeconds = 10 ) -type receiveCbFunc func(chId byte, msgBytes []byte) +type receiveCbFunc func(chID byte, msgBytes []byte) type errorCbFunc func(interface{}) /* @@ -45,15 +45,15 @@ The byte id and the relative priorities of each `Channel` are configured upon initialization of the connection. There are two methods for sending messages: - func (m MConnection) Send(chId byte, msg interface{}) bool {} - func (m MConnection) TrySend(chId byte, msg interface{}) bool {} + func (m MConnection) Send(chID byte, msg interface{}) bool {} + func (m MConnection) TrySend(chID byte, msg interface{}) bool {} -`Send(chId, msg)` is a blocking call that waits until `msg` is successfully queued -for the channel with the given id byte `chId`, or until the request times out. +`Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued +for the channel with the given id byte `chID`, or until the request times out. The message `msg` is serialized using the `tendermint/wire` submodule's `WriteBinary()` reflection routine. -`TrySend(chId, msg)` is a nonblocking call that returns false if the channel's +`TrySend(chID, msg)` is a nonblocking call that returns false if the channel's queue is full. Inbound message bytes are handled with an onReceive callback function. @@ -185,17 +185,17 @@ func (c *MConnection) stopForError(r interface{}) { } // Queues a message to be sent to channel. -func (c *MConnection) Send(chId byte, msg interface{}) bool { +func (c *MConnection) Send(chID byte, msg interface{}) bool { if !c.IsRunning() { return false } - log.Info("Send", "channel", chId, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg)) + log.Info("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg)) // Send message to channel. - channel, ok := c.channelsIdx[chId] + channel, ok := c.channelsIdx[chID] if !ok { - log.Error(Fmt("Cannot send bytes, unknown channel %X", chId)) + log.Error(Fmt("Cannot send bytes, unknown channel %X", chID)) return false } @@ -207,24 +207,24 @@ func (c *MConnection) Send(chId byte, msg interface{}) bool { default: } } else { - log.Warn("Send failed", "channel", chId, "conn", c, "msg", msg) + log.Warn("Send failed", "channel", chID, "conn", c, "msg", msg) } return success } // Queues a message to be sent to channel. // Nonblocking, returns true if successful. -func (c *MConnection) TrySend(chId byte, msg interface{}) bool { +func (c *MConnection) TrySend(chID byte, msg interface{}) bool { if !c.IsRunning() { return false } - log.Info("TrySend", "channel", chId, "conn", c, "msg", msg) + log.Info("TrySend", "channel", chID, "conn", c, "msg", msg) // Send message to channel. - channel, ok := c.channelsIdx[chId] + channel, ok := c.channelsIdx[chID] if !ok { - log.Error(Fmt("Cannot send bytes, unknown channel %X", chId)) + log.Error(Fmt("Cannot send bytes, unknown channel %X", chID)) return false } @@ -240,14 +240,14 @@ func (c *MConnection) TrySend(chId byte, msg interface{}) bool { return ok } -func (c *MConnection) CanSend(chId byte) bool { +func (c *MConnection) CanSend(chID byte) bool { if !c.IsRunning() { return false } - channel, ok := c.channelsIdx[chId] + channel, ok := c.channelsIdx[chID] if !ok { - log.Error(Fmt("Unknown channel %X", chId)) + log.Error(Fmt("Unknown channel %X", chID)) return false } return channel.canSend() @@ -421,9 +421,9 @@ FOR_LOOP: } break FOR_LOOP } - channel, ok := c.channelsIdx[pkt.ChannelId] + channel, ok := c.channelsIdx[pkt.ChannelID] if !ok || channel == nil { - PanicQ(Fmt("Unknown channel %X", pkt.ChannelId)) + PanicQ(Fmt("Unknown channel %X", pkt.ChannelID)) } msgBytes, err := channel.recvMsgPacket(pkt) if err != nil { @@ -434,8 +434,8 @@ FOR_LOOP: break FOR_LOOP } if msgBytes != nil { - log.Debug("Received bytes", "chId", pkt.ChannelId, "msgBytes", msgBytes) - c.onReceive(pkt.ChannelId, msgBytes) + log.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes) + c.onReceive(pkt.ChannelID, msgBytes) } default: PanicSanity(Fmt("Unknown message type %X", pktType)) @@ -456,7 +456,7 @@ FOR_LOOP: //----------------------------------------------------------------------------- type ChannelDescriptor struct { - Id byte + ID byte Priority int SendQueueCapacity int RecvBufferCapacity int @@ -493,7 +493,7 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { return &Channel{ conn: conn, desc: desc, - id: desc.Id, + id: desc.ID, sendQueue: make(chan []byte, desc.SendQueueCapacity), recving: make([]byte, 0, desc.RecvBufferCapacity), priority: desc.Priority, @@ -556,7 +556,7 @@ func (ch *Channel) isSendPending() bool { // Not goroutine-safe func (ch *Channel) nextMsgPacket() msgPacket { packet := msgPacket{} - packet.ChannelId = byte(ch.id) + packet.ChannelID = byte(ch.id) packet.Bytes = ch.sending[:MinInt(maxMsgPacketSize, len(ch.sending))] if len(ch.sending) <= maxMsgPacketSize { packet.EOF = byte(0x01) @@ -617,13 +617,13 @@ const ( // Messages in channels are chopped into smaller msgPackets for multiplexing. type msgPacket struct { - ChannelId byte + ChannelID byte EOF byte // 1 means message ends here. Bytes []byte } func (p msgPacket) String() string { - return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelId, p.Bytes, p.EOF) + return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF) } //----------------------------------------------------------------------------- diff --git a/p2p/peer.go b/p2p/peer.go index 711a4fe19..ba3ed790d 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -49,12 +49,12 @@ func peerHandshake(conn net.Conn, ourNodeInfo *types.NodeInfo) (*types.NodeInfo, // NOTE: call peerHandshake on conn before calling newPeer(). func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { var p *Peer - onReceive := func(chId byte, msgBytes []byte) { - reactor := reactorsByCh[chId] + onReceive := func(chID byte, msgBytes []byte) { + reactor := reactorsByCh[chID] if reactor == nil { - PanicSanity(Fmt("Unknown channel %X", chId)) + PanicSanity(Fmt("Unknown channel %X", chID)) } - reactor.Receive(chId, p, msgBytes) + reactor.Receive(chID, p, msgBytes) } onError := func(r interface{}) { p.Stop() @@ -91,25 +91,25 @@ func (p *Peer) IsOutbound() bool { return p.outbound } -func (p *Peer) Send(chId byte, msg interface{}) bool { +func (p *Peer) Send(chID byte, msg interface{}) bool { if !p.IsRunning() { return false } - return p.mconn.Send(chId, msg) + return p.mconn.Send(chID, msg) } -func (p *Peer) TrySend(chId byte, msg interface{}) bool { +func (p *Peer) TrySend(chID byte, msg interface{}) bool { if !p.IsRunning() { return false } - return p.mconn.TrySend(chId, msg) + return p.mconn.TrySend(chID, msg) } -func (p *Peer) CanSend(chId byte) bool { +func (p *Peer) CanSend(chID byte) bool { if !p.IsRunning() { return false } - return p.mconn.CanSend(chId) + return p.mconn.CanSend(chID) } func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index cce7cc1d6..e203063c6 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -55,7 +55,7 @@ func (pexR *PEXReactor) OnStop() { func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { return []*ChannelDescriptor{ &ChannelDescriptor{ - Id: PexChannel, + ID: PexChannel, Priority: 1, SendQueueCapacity: 10, }, @@ -84,7 +84,7 @@ func (pexR *PEXReactor) RemovePeer(peer *Peer, reason interface{}) { // Implements Reactor // Handles incoming PEX messages. -func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { +func (pexR *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { // decode message _, msg, err := DecodeMessage(msgBytes) diff --git a/p2p/switch.go b/p2p/switch.go index b9daa6405..fe5e10d63 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -20,7 +20,7 @@ type Reactor interface { GetChannels() []*ChannelDescriptor AddPeer(peer *Peer) RemovePeer(peer *Peer, reason interface{}) - Receive(chId byte, peer *Peer, msgBytes []byte) + Receive(chID byte, peer *Peer, msgBytes []byte) } //-------------------------------------- @@ -43,7 +43,7 @@ func (br *BaseReactor) SetSwitch(sw *Switch) { func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil } func (_ *BaseReactor) AddPeer(peer *Peer) {} func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {} -func (_ *BaseReactor) Receive(chId byte, peer *Peer, msgBytes []byte) {} +func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {} //----------------------------------------------------------------------------- @@ -96,12 +96,12 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { // No two reactors can share the same channel. reactorChannels := reactor.GetChannels() for _, chDesc := range reactorChannels { - chId := chDesc.Id - if sw.reactorsByCh[chId] != nil { - PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, sw.reactorsByCh[chId], reactor)) + chID := chDesc.ID + if sw.reactorsByCh[chID] != nil { + PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor)) } sw.chDescs = append(sw.chDescs, chDesc) - sw.reactorsByCh[chId] = reactor + sw.reactorsByCh[chID] = reactor } sw.reactors[name] = reactor reactor.SetSwitch(sw) @@ -285,12 +285,12 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool { // Broadcast runs a go routine for each attempted send, which will block // trying to send for defaultSendTimeoutSeconds. Returns a channel // which receives success values for each attempted send (false if times out) -func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool { +func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { successChan := make(chan bool, len(sw.peers.List())) - log.Info("Broadcast", "channel", chId, "msg", msg) + log.Info("Broadcast", "channel", chID, "msg", msg) for _, peer := range sw.peers.List() { go func(peer *Peer) { - success := peer.Send(chId, msg) + success := peer.Send(chID, msg) successChan <- success }(peer) } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index b03811592..6736f2934 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -57,12 +57,12 @@ func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) { tr.peersRemoved = append(tr.peersRemoved, peer) } -func (tr *TestReactor) Receive(chId byte, peer *Peer, msgBytes []byte) { +func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) { if tr.logMessages { tr.mtx.Lock() defer tr.mtx.Unlock() - //fmt.Printf("Received: %X, %X\n", chId, msgBytes) - tr.msgsReceived[chId] = append(tr.msgsReceived[chId], PeerMessage{peer.Key, msgBytes, tr.msgsCounter}) + //fmt.Printf("Received: %X, %X\n", chID, msgBytes) + tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key, msgBytes, tr.msgsCounter}) tr.msgsCounter++ } } @@ -129,12 +129,12 @@ func TestSwitches(t *testing.T) { s1, s2 := makeSwitchPair(t, func(sw *Switch) *Switch { // Make two reactors of two channels each sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ - &ChannelDescriptor{Id: byte(0x00), Priority: 10}, - &ChannelDescriptor{Id: byte(0x01), Priority: 10}, + &ChannelDescriptor{ID: byte(0x00), Priority: 10}, + &ChannelDescriptor{ID: byte(0x01), Priority: 10}, }, true)) sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ - &ChannelDescriptor{Id: byte(0x02), Priority: 10}, - &ChannelDescriptor{Id: byte(0x03), Priority: 10}, + &ChannelDescriptor{ID: byte(0x02), Priority: 10}, + &ChannelDescriptor{ID: byte(0x03), Priority: 10}, }, true)) return sw }) @@ -196,12 +196,12 @@ func BenchmarkSwitches(b *testing.B) { s1, s2 := makeSwitchPair(b, func(sw *Switch) *Switch { // Make bar reactors of bar channels each sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ - &ChannelDescriptor{Id: byte(0x00), Priority: 10}, - &ChannelDescriptor{Id: byte(0x01), Priority: 10}, + &ChannelDescriptor{ID: byte(0x00), Priority: 10}, + &ChannelDescriptor{ID: byte(0x01), Priority: 10}, }, false)) sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ - &ChannelDescriptor{Id: byte(0x02), Priority: 10}, - &ChannelDescriptor{Id: byte(0x03), Priority: 10}, + &ChannelDescriptor{ID: byte(0x02), Priority: 10}, + &ChannelDescriptor{ID: byte(0x03), Priority: 10}, }, false)) return sw }) @@ -216,8 +216,8 @@ func BenchmarkSwitches(b *testing.B) { // Send random message from foo channel to another for i := 0; i < b.N; i++ { - chId := byte(i % 4) - successChan := s1.Broadcast(chId, "test data") + chID := byte(i % 4) + successChan := s1.Broadcast(chID, "test data") for s := range successChan { if s { numSuccess += 1 diff --git a/rpc/client/client.go b/rpc/client/client.go index bf6aa30a7..3b138c5b5 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -18,7 +18,7 @@ func Call(remote string, method string, params []interface{}, dest interface{}) JSONRPC: "2.0", Method: method, Params: params, - Id: "", + ID: "", } requestBytes := wire.JSONBytes(request) requestBuf := bytes.NewBuffer(requestBytes) diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 96210bfad..e756544c7 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -123,7 +123,7 @@ type ResultEvent struct { type Response struct { JSONRPC string `json:"jsonrpc"` - Id string `json:"id"` + ID string `json:"id"` Result Result `json:"result"` Error string `json:"error"` } diff --git a/rpc/core_client/client.go b/rpc/core_client/client.go index 1fb8a85d0..c24172483 100644 --- a/rpc/core_client/client.go +++ b/rpc/core_client/client.go @@ -187,7 +187,7 @@ fmt JSONRPC: "2.0", Method: reverseFuncMap["{{name}}"], Params: []interface{}{ {{args.ident}} }, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil{ diff --git a/rpc/core_client/client_methods.go b/rpc/core_client/client_methods.go index 6a82e3f80..29c2ac539 100644 --- a/rpc/core_client/client_methods.go +++ b/rpc/core_client/client_methods.go @@ -571,7 +571,7 @@ func (c *ClientJSON) BlockchainInfo(minHeight int, maxHeight int) (*ctypes.Resul JSONRPC: "2.0", Method: reverseFuncMap["BlockchainInfo"], Params: []interface{}{minHeight, maxHeight}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -596,7 +596,7 @@ func (c *ClientJSON) BroadcastTx(tx types.Tx) (*ctypes.ResultBroadcastTx, error) JSONRPC: "2.0", Method: reverseFuncMap["BroadcastTx"], Params: []interface{}{tx}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -621,7 +621,7 @@ func (c *ClientJSON) Call(fromAddress []byte, toAddress []byte, data []byte) (*c JSONRPC: "2.0", Method: reverseFuncMap["Call"], Params: []interface{}{fromAddress, toAddress, data}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -646,7 +646,7 @@ func (c *ClientJSON) CallCode(fromAddress []byte, code []byte, data []byte) (*ct JSONRPC: "2.0", Method: reverseFuncMap["CallCode"], Params: []interface{}{fromAddress, code, data}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -671,7 +671,7 @@ func (c *ClientJSON) DumpConsensusState() (*ctypes.ResultDumpConsensusState, err JSONRPC: "2.0", Method: reverseFuncMap["DumpConsensusState"], Params: []interface{}{}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -696,7 +696,7 @@ func (c *ClientJSON) DumpStorage(address []byte) (*ctypes.ResultDumpStorage, err JSONRPC: "2.0", Method: reverseFuncMap["DumpStorage"], Params: []interface{}{address}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -721,7 +721,7 @@ func (c *ClientJSON) GenPrivAccount() (*ctypes.ResultGenPrivAccount, error) { JSONRPC: "2.0", Method: reverseFuncMap["GenPrivAccount"], Params: []interface{}{}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -746,7 +746,7 @@ func (c *ClientJSON) Genesis() (*ctypes.ResultGenesis, error) { JSONRPC: "2.0", Method: reverseFuncMap["Genesis"], Params: []interface{}{}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -771,7 +771,7 @@ func (c *ClientJSON) GetAccount(address []byte) (*ctypes.ResultGetAccount, error JSONRPC: "2.0", Method: reverseFuncMap["GetAccount"], Params: []interface{}{address}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -796,7 +796,7 @@ func (c *ClientJSON) GetBlock(height int) (*ctypes.ResultGetBlock, error) { JSONRPC: "2.0", Method: reverseFuncMap["GetBlock"], Params: []interface{}{height}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -821,7 +821,7 @@ func (c *ClientJSON) GetName(name string) (*ctypes.ResultGetName, error) { JSONRPC: "2.0", Method: reverseFuncMap["GetName"], Params: []interface{}{name}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -846,7 +846,7 @@ func (c *ClientJSON) GetStorage(address []byte, key []byte) (*ctypes.ResultGetSt JSONRPC: "2.0", Method: reverseFuncMap["GetStorage"], Params: []interface{}{address, key}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -871,7 +871,7 @@ func (c *ClientJSON) ListAccounts() (*ctypes.ResultListAccounts, error) { JSONRPC: "2.0", Method: reverseFuncMap["ListAccounts"], Params: []interface{}{}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -896,7 +896,7 @@ func (c *ClientJSON) ListNames() (*ctypes.ResultListNames, error) { JSONRPC: "2.0", Method: reverseFuncMap["ListNames"], Params: []interface{}{}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -921,7 +921,7 @@ func (c *ClientJSON) ListUnconfirmedTxs() (*ctypes.ResultListUnconfirmedTxs, err JSONRPC: "2.0", Method: reverseFuncMap["ListUnconfirmedTxs"], Params: []interface{}{}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -946,7 +946,7 @@ func (c *ClientJSON) ListValidators() (*ctypes.ResultListValidators, error) { JSONRPC: "2.0", Method: reverseFuncMap["ListValidators"], Params: []interface{}{}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -971,7 +971,7 @@ func (c *ClientJSON) NetInfo() (*ctypes.ResultNetInfo, error) { JSONRPC: "2.0", Method: reverseFuncMap["NetInfo"], Params: []interface{}{}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -996,7 +996,7 @@ func (c *ClientJSON) SignTx(tx types.Tx, privAccounts []*acm.PrivAccount) (*ctyp JSONRPC: "2.0", Method: reverseFuncMap["SignTx"], Params: []interface{}{tx, privAccounts}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { @@ -1021,7 +1021,7 @@ func (c *ClientJSON) Status() (*ctypes.ResultStatus, error) { JSONRPC: "2.0", Method: reverseFuncMap["Status"], Params: []interface{}{}, - Id: "", + ID: "", } body, err := c.RequestResponse(request) if err != nil { diff --git a/rpc/core_client/ws_client.go b/rpc/core_client/ws_client.go index 26d7f5ec8..1db273ae0 100644 --- a/rpc/core_client/ws_client.go +++ b/rpc/core_client/ws_client.go @@ -76,7 +76,7 @@ func (wsc *WSClient) receiveEventsRoutine() { wsc.Stop() break } - if strings.HasSuffix(response.Id, "#event") { + if strings.HasSuffix(response.ID, "#event") { wsc.EventsCh <- *response.Result.(*ctypes.ResultEvent) } else { wsc.ResultsCh <- response.Result @@ -89,7 +89,7 @@ func (wsc *WSClient) receiveEventsRoutine() { func (wsc *WSClient) Subscribe(eventid string) error { err := wsc.WriteJSON(rpctypes.RPCRequest{ JSONRPC: "2.0", - Id: "", + ID: "", Method: "subscribe", Params: []interface{}{eventid}, }) @@ -100,7 +100,7 @@ func (wsc *WSClient) Subscribe(eventid string) error { func (wsc *WSClient) Unsubscribe(eventid string) error { err := wsc.WriteJSON(rpctypes.RPCRequest{ JSONRPC: "2.0", - Id: "", + ID: "", Method: "unsubscribe", Params: []interface{}{eventid}, }) diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index e66574b1d..d0400ec19 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -95,27 +95,27 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc { return } if len(r.URL.Path) > 1 { - WriteRPCResponse(w, NewRPCResponse(request.Id, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path))) + WriteRPCResponse(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path))) return } rpcFunc := funcMap[request.Method] if rpcFunc == nil { - WriteRPCResponse(w, NewRPCResponse(request.Id, nil, "RPC method unknown: "+request.Method)) + WriteRPCResponse(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) return } args, err := jsonParamsToArgs(rpcFunc, request.Params) if err != nil { - WriteRPCResponse(w, NewRPCResponse(request.Id, nil, err.Error())) + WriteRPCResponse(w, NewRPCResponse(request.ID, nil, err.Error())) return } returns := rpcFunc.f.Call(args) log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) result, err := unreflectResult(returns) if err != nil { - WriteRPCResponse(w, NewRPCResponse(request.Id, nil, err.Error())) + WriteRPCResponse(w, NewRPCResponse(request.ID, nil, err.Error())) return } - WriteRPCResponse(w, NewRPCResponse(request.Id, result, "")) + WriteRPCResponse(w, NewRPCResponse(request.ID, result, "")) } } @@ -324,23 +324,23 @@ func (wsc *WSConnection) readRoutine() { err = json.Unmarshal(in, &request) if err != nil { errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) - wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, errStr)) + wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, errStr)) continue } switch request.Method { case "subscribe": if len(request.Params) != 1 { - wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "subscribe takes 1 event parameter string")) + wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "subscribe takes 1 event parameter string")) continue } if event, ok := request.Params[0].(string); !ok { - wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "subscribe takes 1 event parameter string")) + wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "subscribe takes 1 event parameter string")) continue } else { log.Notice("Subscribe to event", "id", wsc.id, "event", event) wsc.evsw.AddListenerForEvent(wsc.id, event, func(msg types.EventData) { // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsc.writeRPCResponse(NewRPCResponse(request.Id+"#event", ctypes.ResultEvent{event, msg}, "")) + wsc.writeRPCResponse(NewRPCResponse(request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) }) continue } @@ -348,41 +348,41 @@ func (wsc *WSConnection) readRoutine() { if len(request.Params) == 0 { log.Notice("Unsubscribe from all events", "id", wsc.id) wsc.evsw.RemoveListener(wsc.id) - wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "")) + wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "")) continue } else if len(request.Params) == 1 { if event, ok := request.Params[0].(string); !ok { - wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "unsubscribe takes 0 or 1 event parameter strings")) + wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "unsubscribe takes 0 or 1 event parameter strings")) continue } else { log.Notice("Unsubscribe from event", "id", wsc.id, "event", event) wsc.evsw.RemoveListenerForEvent(event, wsc.id) - wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "")) + wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "")) continue } } else { - wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "unsubscribe takes 0 or 1 event parameter strings")) + wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "unsubscribe takes 0 or 1 event parameter strings")) continue } default: rpcFunc := wsc.funcMap[request.Method] if rpcFunc == nil { - wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "RPC method unknown: "+request.Method)) + wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) continue } args, err := jsonParamsToArgs(rpcFunc, request.Params) if err != nil { - wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, err.Error())) + wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) continue } returns := rpcFunc.f.Call(args) log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns) result, err := unreflectResult(returns) if err != nil { - wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, err.Error())) + wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) continue } else { - wsc.writeRPCResponse(NewRPCResponse(request.Id, result, "")) + wsc.writeRPCResponse(NewRPCResponse(request.ID, result, "")) continue } } diff --git a/rpc/test/ws_helpers.go b/rpc/test/ws_helpers.go index 3733c8f33..6bf3bab41 100644 --- a/rpc/test/ws_helpers.go +++ b/rpc/test/ws_helpers.go @@ -34,7 +34,7 @@ func newWSCon(t *testing.T) *websocket.Conn { func subscribe(t *testing.T, con *websocket.Conn, eventid string) { err := con.WriteJSON(rpctypes.RPCRequest{ JSONRPC: "2.0", - Id: "", + ID: "", Method: "subscribe", Params: []interface{}{eventid}, }) @@ -47,7 +47,7 @@ func subscribe(t *testing.T, con *websocket.Conn, eventid string) { func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) { err := con.WriteJSON(rpctypes.RPCRequest{ JSONRPC: "2.0", - Id: "", + ID: "", Method: "unsubscribe", Params: []interface{}{eventid}, }) diff --git a/rpc/types/types.go b/rpc/types/types.go index c2bd6f5fe..c93e6735b 100644 --- a/rpc/types/types.go +++ b/rpc/types/types.go @@ -2,14 +2,14 @@ package rpctypes type RPCRequest struct { JSONRPC string `json:"jsonrpc"` - Id string `json:"id"` + ID string `json:"id"` Method string `json:"method"` Params []interface{} `json:"params"` } type RPCResponse struct { JSONRPC string `json:"jsonrpc"` - Id string `json:"id"` + ID string `json:"id"` Result interface{} `json:"result"` Error string `json:"error"` } @@ -17,7 +17,7 @@ type RPCResponse struct { func NewRPCResponse(id string, res interface{}, err string) RPCResponse { return RPCResponse{ JSONRPC: "2.0", - Id: id, + ID: id, Result: res, Error: err, } diff --git a/vm/test/log_event_test.go b/vm/test/log_event_test.go index 2e3f83376..fa598b00e 100644 --- a/vm/test/log_event_test.go +++ b/vm/test/log_event_test.go @@ -40,11 +40,11 @@ func TestLog4(t *testing.T) { if err != nil { t.Errorf("Failed to start eventSwitch: %v", err) } - eventId := types.EventStringLogEvent(account2.Address.Postfix(20)) + eventID := types.EventStringLogEvent(account2.Address.Postfix(20)) doneChan := make(chan struct{}, 1) - eventSwitch.AddListenerForEvent("test", eventId, func(event types.EventData) { + eventSwitch.AddListenerForEvent("test", eventID, func(event types.EventData) { logEvent := event.(types.EventDataLog) // No need to test address as this event would not happen if it wasn't correct if !reflect.DeepEqual(logEvent.Topics, expectedTopics) { diff --git a/vm/vm.go b/vm/vm.go index d0a0906ed..7d60752e3 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -692,15 +692,15 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value int64, gas return nil, firstErr(err, ErrMemoryOutOfBounds) } if vm.evc != nil { - eventId := types.EventStringLogEvent(callee.Address.Postfix(20)) - fmt.Printf("eventId: %s\n", eventId) + eventID := types.EventStringLogEvent(callee.Address.Postfix(20)) + fmt.Printf("eventID: %s\n", eventID) log := types.EventDataLog{ callee.Address, topics, data, vm.params.BlockHeight, } - vm.evc.FireEvent(eventId, log) + vm.evc.FireEvent(eventID, log) } dbg.Printf(" => T:%X D:%X\n", topics, data)