Browse Source

Blockpool better timeouts, download rate observation, quicker switching to consensus; Id -> ID (sorry, this doesn't belong here)

pull/139/merge
Jae Kwon 9 years ago
parent
commit
625f23af13
24 changed files with 476 additions and 369 deletions
  1. +262
    -172
      blockchain/pool.go
  2. +9
    -9
      blockchain/pool_test.go
  3. +19
    -22
      blockchain/reactor.go
  4. +18
    -6
      common/service.go
  5. +17
    -9
      config/tendermint/config.go
  6. +9
    -9
      consensus/reactor.go
  7. +16
    -16
      events/events.go
  8. +2
    -2
      mempool/reactor.go
  9. +9
    -9
      p2p/README.md
  10. +29
    -29
      p2p/connection.go
  11. +10
    -10
      p2p/peer.go
  12. +2
    -2
      p2p/pex_reactor.go
  13. +9
    -9
      p2p/switch.go
  14. +13
    -13
      p2p/switch_test.go
  15. +1
    -1
      rpc/client/client.go
  16. +1
    -1
      rpc/core/types/responses.go
  17. +1
    -1
      rpc/core_client/client.go
  18. +19
    -19
      rpc/core_client/client_methods.go
  19. +3
    -3
      rpc/core_client/ws_client.go
  20. +17
    -17
      rpc/server/handlers.go
  21. +2
    -2
      rpc/test/ws_helpers.go
  22. +3
    -3
      rpc/types/types.go
  23. +2
    -2
      vm/test/log_event_test.go
  24. +3
    -3
      vm/vm.go

+ 262
- 172
blockchain/pool.go View File

@ -1,24 +1,22 @@
package blockchain package blockchain
import ( import (
"math"
"sync" "sync"
"time" "time"
flow "github.com/tendermint/tendermint/Godeps/_workspace/src/code.google.com/p/mxk/go1/flowcontrol"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
const ( const (
maxTries = 3
inputsChannelCapacity = 200
requestIntervalMS = 500
maxPendingRequests = 200
maxTotalRequests = 300
maxRequestsPerPeer = 300
)
var (
requestTimeoutSeconds = time.Duration(3)
requestIntervalMS = 500
maxTotalRequests = 300
maxPendingRequests = maxTotalRequests
maxPendingRequestsPerPeer = 30
peerTimeoutSeconds = 10
minRecvRate = 10240 // 10Kb/s
) )
/* /*
@ -33,14 +31,13 @@ var (
*/ */
type BlockPool struct { type BlockPool struct {
BaseService
QuitService
// block requests // block requests
requestsMtx sync.Mutex
requests map[int]*bpRequest
height int // the lowest key in requests.
numUnassigned int32 // number of requests not yet assigned to a peer
numPending int32 // number of requests pending assignment or block response
mtx sync.Mutex
requests map[int]*bpRequester
height int // the lowest key in requests.
numPending int32 // number of requests pending assignment or block response
// peers // peers
peersMtx sync.Mutex peersMtx sync.Mutex
@ -48,46 +45,40 @@ type BlockPool struct {
requestsCh chan<- BlockRequest requestsCh chan<- BlockRequest
timeoutsCh chan<- string timeoutsCh chan<- string
repeater *RepeatTimer
} }
func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool { func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
bp := &BlockPool{ bp := &BlockPool{
peers: make(map[string]*bpPeer), peers: make(map[string]*bpPeer),
requests: make(map[int]*bpRequest),
height: start,
numUnassigned: 0,
numPending: 0,
requests: make(map[int]*bpRequester),
height: start,
numPending: 0,
requestsCh: requestsCh, requestsCh: requestsCh,
timeoutsCh: timeoutsCh, timeoutsCh: timeoutsCh,
repeater: nil,
} }
bp.BaseService = *NewBaseService(log, "BlockPool", bp)
bp.QuitService = *NewQuitService(log, "BlockPool", bp)
return bp return bp
} }
func (pool *BlockPool) OnStart() error { func (pool *BlockPool) OnStart() error {
pool.BaseService.OnStart()
pool.repeater = NewRepeatTimer("", requestIntervalMS*time.Millisecond)
go pool.run()
pool.QuitService.OnStart()
go pool.makeRequestsRoutine()
return nil return nil
} }
func (pool *BlockPool) OnStop() { func (pool *BlockPool) OnStop() {
pool.BaseService.OnStop()
pool.repeater.Stop()
pool.QuitService.OnStop()
} }
// Run spawns requests as needed. // Run spawns requests as needed.
func (pool *BlockPool) run() {
RUN_LOOP:
func (pool *BlockPool) makeRequestsRoutine() {
for { for {
if !pool.IsRunning() { if !pool.IsRunning() {
break RUN_LOOP
break
} }
_, numPending, _ := pool.GetStatus()
_, numPending := pool.GetStatus()
if numPending >= maxPendingRequests { if numPending >= maxPendingRequests {
// sleep for a bit. // sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond) time.Sleep(requestIntervalMS * time.Millisecond)
@ -101,18 +92,35 @@ RUN_LOOP:
} }
} }
func (pool *BlockPool) GetStatus() (height int, numPending int32, numUnssigned int32) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
func (pool *BlockPool) GetStatus() (height int, numPending int32) {
pool.mtx.Lock() // Lock
defer pool.mtx.Unlock()
return pool.height, pool.numPending
}
// TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.Lock()
height := pool.height
pool.mtx.Unlock()
pool.peersMtx.Lock()
numPeers := len(pool.peers)
maxPeerHeight := 0
for _, peer := range pool.peers {
maxPeerHeight = MaxInt(maxPeerHeight, peer.height)
}
pool.peersMtx.Unlock()
return pool.height, pool.numPending, pool.numUnassigned
return numPeers >= 3 && height > 0 && height == maxPeerHeight
} }
// We need to see the second block's Validation to validate the first block. // We need to see the second block's Validation to validate the first block.
// So we peek two blocks at a time. // So we peek two blocks at a time.
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
pool.mtx.Lock() // Lock
defer pool.mtx.Unlock()
if r := pool.requests[pool.height]; r != nil { if r := pool.requests[pool.height]; r != nil {
first = r.block first = r.block
@ -126,8 +134,8 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block)
// Pop the first block at pool.height // Pop the first block at pool.height
// It must have been validated by 'second'.Validation from PeekTwoBlocks(). // It must have been validated by 'second'.Validation from PeekTwoBlocks().
func (pool *BlockPool) PopRequest() { func (pool *BlockPool) PopRequest() {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
pool.mtx.Lock() // Lock
defer pool.mtx.Unlock()
if r := pool.requests[pool.height]; r == nil || r.block == nil { if r := pool.requests[pool.height]; r == nil || r.block == nil {
PanicSanity("PopRequest() requires a valid block") PanicSanity("PopRequest() requires a valid block")
@ -137,108 +145,77 @@ func (pool *BlockPool) PopRequest() {
pool.height++ pool.height++
} }
// Invalidates the block at pool.height.
// Remove the peer and request from others.
// Invalidates the block at pool.height,
// Remove the peer and redo request from others.
func (pool *BlockPool) RedoRequest(height int) { func (pool *BlockPool) RedoRequest(height int) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
pool.mtx.Lock() // Lock
defer pool.mtx.Unlock()
request := pool.requests[height] request := pool.requests[height]
if request.block == nil { if request.block == nil {
PanicSanity("Expected block to be non-nil") PanicSanity("Expected block to be non-nil")
} }
// RemovePeer will redo all requests associated with this peer.
// TODO: record this malfeasance // TODO: record this malfeasance
// maybe punish peer on switch (an invalid block!)
pool.RemovePeer(request.peerId) // Lock on peersMtx.
request.block = nil
request.peerId = ""
pool.numPending++
pool.numUnassigned++
go requestRoutine(pool, height)
pool.RemovePeer(request.peerID) // Lock on peersMtx.
} }
func (pool *BlockPool) hasBlock(height int) bool {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
request := pool.requests[height]
return request != nil && request.block != nil
}
// TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) {
pool.mtx.Lock() // Lock
defer pool.mtx.Unlock()
func (pool *BlockPool) setPeerForRequest(height int, peerId string) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
request := pool.requests[height]
request := pool.requests[block.Height]
if request == nil { if request == nil {
return return
} }
pool.numUnassigned--
request.peerId = peerId
}
func (pool *BlockPool) removePeerForRequest(height int, peerId string) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
request := pool.requests[height]
if request == nil {
return
if request.setBlock(block, peerID) {
pool.numPending--
peer := pool.getPeer(peerID)
peer.decrPending(blockSize)
} else {
// Bad peer?
} }
pool.numUnassigned++
request.peerId = ""
} }
func (pool *BlockPool) AddBlock(block *types.Block, peerId string) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
// Sets the peer's alleged blockchain height.
func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
request := pool.requests[block.Height]
if request == nil {
return
}
if request.peerId != peerId {
return
}
if request.block != nil {
return
peer := pool.peers[peerID]
if peer != nil {
peer.height = height
} else {
peer = newBPPeer(pool, peerID, height)
pool.peers[peerID] = peer
} }
request.block = block
pool.numPending--
} }
func (pool *BlockPool) getPeer(peerId string) *bpPeer {
func (pool *BlockPool) RemovePeer(peerID string) {
pool.peersMtx.Lock() // Lock pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock() defer pool.peersMtx.Unlock()
peer := pool.peers[peerId]
return peer
pool.removePeer(peerID)
} }
// Sets the peer's alleged blockchain height.
func (pool *BlockPool) SetPeerHeight(peerId string, height int) {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
peer := pool.peers[peerId]
if peer != nil {
peer.height = height
} else {
peer = &bpPeer{
height: height,
id: peerId,
numRequests: 0,
func (pool *BlockPool) removePeer(peerID string) {
for _, request := range pool.requests {
if request.getPeerID() == peerID {
pool.numPending++
request.redo() // pick another peer and ...
} }
pool.peers[peerId] = peer
} }
delete(pool.peers, peerID)
} }
func (pool *BlockPool) RemovePeer(peerId string) {
func (pool *BlockPool) getPeer(peerID string) *bpPeer {
pool.peersMtx.Lock() // Lock pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock() defer pool.peersMtx.Unlock()
delete(pool.peers, peerId)
peer := pool.peers[peerID]
return peer
} }
// Pick an available peer with at least the given minHeight. // Pick an available peer with at least the given minHeight.
@ -248,64 +225,52 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
defer pool.peersMtx.Unlock() defer pool.peersMtx.Unlock()
for _, peer := range pool.peers { for _, peer := range pool.peers {
if peer.numRequests >= maxRequestsPerPeer {
if peer.isBad() {
pool.removePeer(peer.id)
continue
}
if peer.numPending >= maxPendingRequestsPerPeer {
continue continue
} }
if peer.height < minHeight { if peer.height < minHeight {
continue continue
} }
peer.numRequests++
peer.incrPending()
return peer return peer
} }
return nil return nil
} }
func (pool *BlockPool) decrPeer(peerId string) {
pool.peersMtx.Lock()
defer pool.peersMtx.Unlock()
peer := pool.peers[peerId]
if peer == nil {
return
}
peer.numRequests--
}
func (pool *BlockPool) makeNextRequest() { func (pool *BlockPool) makeNextRequest() {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
pool.mtx.Lock() // Lock
defer pool.mtx.Unlock()
nextHeight := pool.height + len(pool.requests) nextHeight := pool.height + len(pool.requests)
request := &bpRequest{
height: nextHeight,
peerId: "",
block: nil,
}
request := newBPRequester(pool, nextHeight)
pool.requests[nextHeight] = request pool.requests[nextHeight] = request
pool.numUnassigned++
pool.numPending++ pool.numPending++
go requestRoutine(pool, nextHeight)
request.Start()
} }
func (pool *BlockPool) sendRequest(height int, peerId string) {
func (pool *BlockPool) sendRequest(height int, peerID string) {
if !pool.IsRunning() { if !pool.IsRunning() {
return return
} }
pool.requestsCh <- BlockRequest{height, peerId}
pool.requestsCh <- BlockRequest{height, peerID}
} }
func (pool *BlockPool) sendTimeout(peerId string) {
func (pool *BlockPool) sendTimeout(peerID string) {
if !pool.IsRunning() { if !pool.IsRunning() {
return return
} }
pool.timeoutsCh <- peerId
pool.timeoutsCh <- peerID
} }
func (pool *BlockPool) debug() string { func (pool *BlockPool) debug() string {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
pool.mtx.Lock() // Lock
defer pool.mtx.Unlock()
str := "" str := ""
for h := pool.height; h < pool.height+len(pool.requests); h++ { for h := pool.height; h < pool.height+len(pool.requests); h++ {
@ -322,64 +287,189 @@ func (pool *BlockPool) debug() string {
//------------------------------------- //-------------------------------------
type bpPeer struct { type bpPeer struct {
pool *BlockPool
id string id string
height int height int
numRequests int32
numPending int32
recvMonitor *flow.Monitor
timeout *time.Timer
didTimeout bool
} }
type bpRequest struct {
height int
peerId string
block *types.Block
func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer {
peer := &bpPeer{
pool: pool,
id: peerID,
height: height,
numPending: 0,
}
return peer
}
func (bpp *bpPeer) resetMonitor() {
bpp.recvMonitor = flow.New(time.Second, time.Second*40)
var initialValue = float64(minRecvRate) * math.E
bpp.recvMonitor.Update(int(initialValue))
}
func (bpp *bpPeer) resetTimeout() {
if bpp.timeout == nil {
bpp.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, bpp.onTimeout)
} else {
bpp.timeout.Reset(time.Second * peerTimeoutSeconds)
}
}
func (bpp *bpPeer) incrPending() {
if bpp.numPending == 0 {
bpp.resetMonitor()
bpp.resetTimeout()
}
bpp.numPending++
}
func (bpp *bpPeer) decrPending(recvSize int) {
bpp.numPending--
if bpp.numPending == 0 {
bpp.timeout.Stop()
} else {
bpp.recvMonitor.Update(recvSize)
bpp.resetTimeout()
}
}
func (bpp *bpPeer) onTimeout() {
bpp.didTimeout = true
}
func (bpp *bpPeer) isBad() bool {
if bpp.didTimeout {
bpp.pool.sendTimeout(bpp.id)
return true
}
if bpp.numPending == 0 {
return false
} else {
bpp.pool.sendTimeout(bpp.id)
return bpp.recvMonitor.Status().CurRate < minRecvRate
}
} }
//------------------------------------- //-------------------------------------
type bpRequester struct {
QuitService
pool *BlockPool
height int
gotBlockCh chan struct{}
redoCh chan struct{}
mtx sync.Mutex
peerID string
block *types.Block
}
func newBPRequester(pool *BlockPool, height int) *bpRequester {
bpr := &bpRequester{
pool: pool,
height: height,
gotBlockCh: make(chan struct{}),
redoCh: make(chan struct{}),
peerID: "",
block: nil,
}
bpr.QuitService = *NewQuitService(nil, "bpRequester", bpr)
return bpr
}
func (bpr *bpRequester) OnStart() error {
bpr.QuitService.OnStart()
go bpr.requestRoutine()
return nil
}
// Returns true if the peer matches
func (bpr *bpRequester) setBlock(block *types.Block, peerID string) bool {
bpr.mtx.Lock()
if bpr.block != nil || bpr.peerID != peerID {
bpr.mtx.Unlock()
return false
}
bpr.block = block
bpr.mtx.Unlock()
bpr.gotBlockCh <- struct{}{}
return true
}
func (bpr *bpRequester) getPeerID() string {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.peerID
}
func (bpr *bpRequester) reset() {
bpr.mtx.Lock()
bpr.peerID = ""
bpr.block = nil
bpr.mtx.Unlock()
}
// Tells bpRequester to pick another peer and try again.
func (bpr *bpRequester) redo() {
bpr.redoCh <- struct{}{}
}
// Responsible for making more requests as necessary // Responsible for making more requests as necessary
// Returns only when a block is found (e.g. AddBlock() is called) // Returns only when a block is found (e.g. AddBlock() is called)
func requestRoutine(pool *BlockPool, height int) {
func (bpr *bpRequester) requestRoutine() {
OUTER_LOOP:
for { for {
// Pick a peer to send request to.
var peer *bpPeer = nil var peer *bpPeer = nil
PICK_LOOP:
PICK_PEER_LOOP:
for { for {
if !pool.IsRunning() {
log.Info("BlockPool not running. Stopping requestRoutine", "height", height)
if !bpr.IsRunning() || !bpr.pool.IsRunning() {
return return
} }
peer = pool.pickIncrAvailablePeer(height)
peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
if peer == nil { if peer == nil {
//log.Info("No peers available", "height", height) //log.Info("No peers available", "height", height)
time.Sleep(requestIntervalMS * time.Millisecond) time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_LOOP
continue PICK_PEER_LOOP
} }
break PICK_LOOP
break PICK_PEER_LOOP
} }
// set the peer, decrement numUnassigned
pool.setPeerForRequest(height, peer.id)
for try := 0; try < maxTries; try++ {
pool.sendRequest(height, peer.id)
time.Sleep(requestTimeoutSeconds * time.Second)
// if successful the block is either in the pool,
if pool.hasBlock(height) {
pool.decrPeer(peer.id)
bpr.mtx.Lock()
bpr.peerID = peer.id
bpr.mtx.Unlock()
// Send request and wait.
bpr.pool.sendRequest(bpr.height, peer.id)
select {
case <-bpr.pool.Quit:
bpr.Stop()
return
case <-bpr.Quit:
return
case <-bpr.redoCh:
bpr.reset()
continue OUTER_LOOP // When peer is removed
case <-bpr.gotBlockCh:
// We got the block, now see if it's good.
select {
case <-bpr.pool.Quit:
bpr.Stop()
return return
}
// or already processed and we've moved past it
bpHeight, _, _ := pool.GetStatus()
if height < bpHeight {
pool.decrPeer(peer.id)
case <-bpr.Quit:
return return
case <-bpr.redoCh:
bpr.reset()
continue OUTER_LOOP
} }
} }
// unset the peer, increment numUnassigned
pool.removePeerForRequest(height, peer.id)
// this peer failed us, try again
pool.RemovePeer(peer.id)
pool.sendTimeout(peer.id)
} }
} }
@ -387,5 +477,5 @@ func requestRoutine(pool *BlockPool, height int) {
type BlockRequest struct { type BlockRequest struct {
Height int Height int
PeerId string
PeerID string
} }

+ 9
- 9
blockchain/pool_test.go View File

@ -17,9 +17,9 @@ type testPeer struct {
func makePeers(numPeers int, minHeight, maxHeight int) map[string]testPeer { func makePeers(numPeers int, minHeight, maxHeight int) map[string]testPeer {
peers := make(map[string]testPeer, numPeers) peers := make(map[string]testPeer, numPeers)
for i := 0; i < numPeers; i++ { for i := 0; i < numPeers; i++ {
peerId := RandStr(12)
peerID := RandStr(12)
height := minHeight + rand.Intn(maxHeight-minHeight) height := minHeight + rand.Intn(maxHeight-minHeight)
peers[peerId] = testPeer{peerId, height}
peers[peerID] = testPeer{peerID, height}
} }
return peers return peers
} }
@ -57,8 +57,8 @@ func TestBasic(t *testing.T) {
// Pull from channels // Pull from channels
for { for {
select { select {
case peerId := <-timeoutsCh:
t.Errorf("timeout: %v", peerId)
case peerID := <-timeoutsCh:
t.Errorf("timeout: %v", peerID)
case request := <-requestsCh: case request := <-requestsCh:
log.Info("TEST: Pulled new BlockRequest", "request", request) log.Info("TEST: Pulled new BlockRequest", "request", request)
if request.Height == 300 { if request.Height == 300 {
@ -67,8 +67,8 @@ func TestBasic(t *testing.T) {
// Request desired, pretend like we got the block immediately. // Request desired, pretend like we got the block immediately.
go func() { go func() {
block := &types.Block{Header: &types.Header{Height: request.Height}} block := &types.Block{Header: &types.Header{Height: request.Height}}
pool.AddBlock(block, request.PeerId)
log.Info("TEST: Added block", "block", request.Height, "peer", request.PeerId)
pool.AddBlock(request.PeerID, block, 123)
log.Info("TEST: Added block", "block", request.Height, "peer", request.PeerID)
}() }()
} }
} }
@ -111,9 +111,9 @@ func TestTimeout(t *testing.T) {
timedOut := map[string]struct{}{} timedOut := map[string]struct{}{}
for { for {
select { select {
case peerId := <-timeoutsCh:
log.Info("Timeout", "peerId", peerId)
if _, ok := timedOut[peerId]; !ok {
case peerID := <-timeoutsCh:
log.Info("Timeout", "peerID", peerID)
if _, ok := timedOut[peerID]; !ok {
counter++ counter++
if counter == len(peers) { if counter == len(peers) {
return // Done! return // Done!


+ 19
- 22
blockchain/reactor.go View File

@ -27,7 +27,7 @@ const (
// ask for best height every 10s // ask for best height every 10s
statusUpdateIntervalSeconds = 10 statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor // check if we should switch to consensus reactor
switchToConsensusIntervalSeconds = 10
switchToConsensusIntervalSeconds = 1
) )
type consensusReactor interface { type consensusReactor interface {
@ -97,7 +97,7 @@ func (bcR *BlockchainReactor) OnStop() {
func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{ return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{
Id: BlockchainChannel,
ID: BlockchainChannel,
Priority: 5, Priority: 5,
SendQueueCapacity: 100, SendQueueCapacity: 100,
}, },
@ -117,7 +117,7 @@ func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
} }
// Implements Reactor // Implements Reactor
func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes) _, msg, err := DecodeMessage(msgBytes)
if err != nil { if err != nil {
log.Warn("Error decoding message", "error", err) log.Warn("Error decoding message", "error", err)
@ -141,7 +141,7 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte)
} }
case *bcBlockResponseMessage: case *bcBlockResponseMessage:
// Got a block. // Got a block.
bcR.pool.AddBlock(msg.Block, src.Key)
bcR.pool.AddBlock(src.Key, msg.Block, len(msgBytes))
case *bcStatusRequestMessage: case *bcStatusRequestMessage:
// Send peer our state. // Send peer our state.
queued := src.TrySend(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()}) queued := src.TrySend(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
@ -169,21 +169,20 @@ FOR_LOOP:
for { for {
select { select {
case request := <-bcR.requestsCh: // chan BlockRequest case request := <-bcR.requestsCh: // chan BlockRequest
peer := bcR.Switch.Peers().Get(request.PeerId)
peer := bcR.Switch.Peers().Get(request.PeerID)
if peer == nil { if peer == nil {
// We can't assign the request.
continue FOR_LOOP
continue FOR_LOOP // Peer has since been disconnected.
} }
msg := &bcBlockRequestMessage{request.Height} msg := &bcBlockRequestMessage{request.Height}
queued := peer.TrySend(BlockchainChannel, msg) queued := peer.TrySend(BlockchainChannel, msg)
if !queued { if !queued {
// We couldn't make the request, send-queue full. // We couldn't make the request, send-queue full.
// The pool handles retries, so just let it go.
// The pool handles timeouts, just let it go.
continue FOR_LOOP continue FOR_LOOP
} }
case peerId := <-bcR.timeoutsCh: // chan string
case peerID := <-bcR.timeoutsCh: // chan string
// Peer timed out. // Peer timed out.
peer := bcR.Switch.Peers().Get(peerId)
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil { if peer != nil {
bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
} }
@ -191,17 +190,11 @@ FOR_LOOP:
// ask for status updates // ask for status updates
go bcR.BroadcastStatusRequest() go bcR.BroadcastStatusRequest()
case _ = <-switchToConsensusTicker.C: case _ = <-switchToConsensusTicker.C:
height, numPending, numUnassigned := bcR.pool.GetStatus()
height, numPending := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers() outbound, inbound, _ := bcR.Switch.NumPeers()
log.Info("Consensus ticker", "numUnassigned", numUnassigned, "numPending", numPending,
"total", len(bcR.pool.requests), "outbound", outbound, "inbound", inbound)
// NOTE: this condition is very strict right now. may need to weaken
// If all `maxPendingRequests` requests are unassigned
// and we have some peers (say >= 3), then we're caught up
maxPending := numPending == maxPendingRequests
allUnassigned := numPending == numUnassigned
enoughPeers := outbound+inbound >= 3
if maxPending && allUnassigned && enoughPeers {
log.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requests),
"outbound", outbound, "inbound", inbound)
if bcR.pool.IsCaughtUp() {
log.Notice("Time to switch to consensus reactor!", "height", height) log.Notice("Time to switch to consensus reactor!", "height", height)
bcR.pool.Stop() bcR.pool.Stop()
@ -283,11 +276,15 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest}, wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
) )
// TODO: ensure that bz is completely read.
func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) { func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
msgType = bz[0] msgType = bz[0]
n := new(int64)
n := int64(0)
r := bytes.NewReader(bz) r := bytes.NewReader(bz)
msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, n, &err).(struct{ BlockchainMessage }).BlockchainMessage
msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
if err != nil && n != int64(len(bz)) {
err = errors.New("DecodeMessage() had bytes left over.")
}
return return
} }


+ 18
- 6
common/service.go View File

@ -75,15 +75,21 @@ func NewBaseService(log log15.Logger, name string, impl Service) *BaseService {
func (bs *BaseService) Start() (bool, error) { func (bs *BaseService) Start() (bool, error) {
if atomic.CompareAndSwapUint32(&bs.started, 0, 1) { if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
if atomic.LoadUint32(&bs.stopped) == 1 { if atomic.LoadUint32(&bs.stopped) == 1 {
bs.log.Warn(Fmt("Not starting %v -- already stopped", bs.name), "impl", bs.impl)
if bs.log != nil {
bs.log.Warn(Fmt("Not starting %v -- already stopped", bs.name), "impl", bs.impl)
}
return false, nil return false, nil
} else { } else {
bs.log.Notice(Fmt("Starting %v", bs.name), "impl", bs.impl)
if bs.log != nil {
bs.log.Notice(Fmt("Starting %v", bs.name), "impl", bs.impl)
}
} }
err := bs.impl.OnStart() err := bs.impl.OnStart()
return true, err return true, err
} else { } else {
bs.log.Info(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl)
if bs.log != nil {
bs.log.Info(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl)
}
return false, nil return false, nil
} }
} }
@ -94,11 +100,15 @@ func (bs *BaseService) OnStart() error { return nil }
// Implements Service // Implements Service
func (bs *BaseService) Stop() bool { func (bs *BaseService) Stop() bool {
if atomic.CompareAndSwapUint32(&bs.stopped, 0, 1) { if atomic.CompareAndSwapUint32(&bs.stopped, 0, 1) {
bs.log.Notice(Fmt("Stopping %v", bs.name), "impl", bs.impl)
if bs.log != nil {
bs.log.Notice(Fmt("Stopping %v", bs.name), "impl", bs.impl)
}
bs.impl.OnStop() bs.impl.OnStop()
return true return true
} else { } else {
bs.log.Notice(Fmt("Not stopping %v", bs.name), "impl", bs.impl)
if bs.log != nil {
bs.log.Notice(Fmt("Not stopping %v", bs.name), "impl", bs.impl)
}
return false return false
} }
} }
@ -138,5 +148,7 @@ func (qs *QuitService) OnStart() error {
// NOTE: when overriding OnStop, must call .QuitService.OnStop(). // NOTE: when overriding OnStop, must call .QuitService.OnStop().
func (qs *QuitService) OnStop() { func (qs *QuitService) OnStop() {
close(qs.Quit)
if qs.Quit != nil {
close(qs.Quit)
}
} }

+ 17
- 9
config/tendermint/config.go View File

@ -106,20 +106,28 @@ var defaultGenesis = `{
"accounts": [ "accounts": [
{ {
"address": "9FCBA7F840A0BFEBBE755E853C9947270A912D04", "address": "9FCBA7F840A0BFEBBE755E853C9947270A912D04",
"amount": 2098999998000000
"amount": 1997999998000000
},
{
"address": "964B1493BBE3312278B7DEB94C39149F7899A345",
"amount": 100000000000000
},
{
"address": "B9FA4AB462B9C6BF6A62DB4AE77C9E7087209A04",
"amount": 1000000000000
},
{
"address": "F171824590D69386F709E7B6704B369C5A370D60",
"amount": 1000000000000
}, },
{
"address": "B9FA4AB462B9C6BF6A62DB4AE77C9E7087209A04",
"amount": 1000000000000
},
{ {
"address": "A88A61069B6660F30F65E8786AFDD4F1D8F625E9", "address": "A88A61069B6660F30F65E8786AFDD4F1D8F625E9",
"amount": 1000000
},
"amount": 1000000
},
{ {
"address": "EE2EE9247973B4AFC3867CFE5F415410AC251B61", "address": "EE2EE9247973B4AFC3867CFE5F415410AC251B61",
"amount": 1000000
}
"amount": 1000000
}
], ],
"validators": [ "validators": [
{ {


+ 9
- 9
consensus/reactor.go View File

@ -82,17 +82,17 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
// TODO optimize // TODO optimize
return []*p2p.ChannelDescriptor{ return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{
Id: StateChannel,
ID: StateChannel,
Priority: 5, Priority: 5,
SendQueueCapacity: 100, SendQueueCapacity: 100,
}, },
&p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{
Id: DataChannel,
ID: DataChannel,
Priority: 5, Priority: 5,
SendQueueCapacity: 2, SendQueueCapacity: 2,
}, },
&p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{
Id: VoteChannel,
ID: VoteChannel,
Priority: 5, Priority: 5,
SendQueueCapacity: 40, SendQueueCapacity: 40,
}, },
@ -131,9 +131,9 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
// Implements Reactor // Implements Reactor
// NOTE: We process these messages even when we're fast_syncing. // NOTE: We process these messages even when we're fast_syncing.
func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) {
if !conR.IsRunning() { if !conR.IsRunning() {
log.Debug("Receive", "channel", chId, "peer", peer, "bytes", msgBytes)
log.Debug("Receive", "channel", chID, "peer", peer, "bytes", msgBytes)
return return
} }
@ -142,12 +142,12 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
ps := peer.Data.Get(PeerStateKey).(*PeerState) ps := peer.Data.Get(PeerStateKey).(*PeerState)
_, msg, err := DecodeMessage(msgBytes) _, msg, err := DecodeMessage(msgBytes)
if err != nil { if err != nil {
log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes)
log.Warn("Error decoding message", "channel", chID, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes)
return return
} }
log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg, "rsHeight", rs.Height)
log.Debug("Receive", "channel", chID, "peer", peer, "msg", msg, "rsHeight", rs.Height)
switch chId {
switch chID {
case StateChannel: case StateChannel:
switch msg := msg.(type) { switch msg := msg.(type) {
case *NewRoundStepMessage: case *NewRoundStepMessage:
@ -232,7 +232,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
} }
default: default:
log.Warn(Fmt("Unknown channel %X", chId))
log.Warn(Fmt("Unknown channel %X", chID))
} }
if err != nil { if err != nil {


+ 16
- 16
events/events.go View File

@ -45,7 +45,7 @@ func (evsw *EventSwitch) OnStop() {
evsw.listeners = nil evsw.listeners = nil
} }
func (evsw *EventSwitch) AddListenerForEvent(listenerId, event string, cb eventCallback) {
func (evsw *EventSwitch) AddListenerForEvent(listenerID, event string, cb eventCallback) {
// Get/Create eventCell and listener // Get/Create eventCell and listener
evsw.mtx.Lock() evsw.mtx.Lock()
eventCell := evsw.eventCells[event] eventCell := evsw.eventCells[event]
@ -53,23 +53,23 @@ func (evsw *EventSwitch) AddListenerForEvent(listenerId, event string, cb eventC
eventCell = newEventCell() eventCell = newEventCell()
evsw.eventCells[event] = eventCell evsw.eventCells[event] = eventCell
} }
listener := evsw.listeners[listenerId]
listener := evsw.listeners[listenerID]
if listener == nil { if listener == nil {
listener = newEventListener(listenerId)
evsw.listeners[listenerId] = listener
listener = newEventListener(listenerID)
evsw.listeners[listenerID] = listener
} }
evsw.mtx.Unlock() evsw.mtx.Unlock()
// Add event and listener // Add event and listener
eventCell.AddListener(listenerId, cb)
eventCell.AddListener(listenerID, cb)
listener.AddEvent(event) listener.AddEvent(event)
} }
func (evsw *EventSwitch) RemoveListener(listenerId string) {
func (evsw *EventSwitch) RemoveListener(listenerID string) {
// Get and remove listener // Get and remove listener
evsw.mtx.RLock() evsw.mtx.RLock()
listener := evsw.listeners[listenerId]
delete(evsw.listeners, listenerId)
listener := evsw.listeners[listenerID]
delete(evsw.listeners, listenerID)
evsw.mtx.RUnlock() evsw.mtx.RUnlock()
if listener == nil { if listener == nil {
@ -79,11 +79,11 @@ func (evsw *EventSwitch) RemoveListener(listenerId string) {
// Remove callback for each event. // Remove callback for each event.
listener.SetRemoved() listener.SetRemoved()
for _, event := range listener.GetEvents() { for _, event := range listener.GetEvents() {
evsw.RemoveListenerForEvent(event, listenerId)
evsw.RemoveListenerForEvent(event, listenerID)
} }
} }
func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerId string) {
func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerID string) {
// Get eventCell // Get eventCell
evsw.mtx.Lock() evsw.mtx.Lock()
eventCell := evsw.eventCells[event] eventCell := evsw.eventCells[event]
@ -93,8 +93,8 @@ func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerId string)
return return
} }
// Remove listenerId from eventCell
numListeners := eventCell.RemoveListener(listenerId)
// Remove listenerID from eventCell
numListeners := eventCell.RemoveListener(listenerID)
// Maybe garbage collect eventCell. // Maybe garbage collect eventCell.
if numListeners == 0 { if numListeners == 0 {
@ -137,15 +137,15 @@ func newEventCell() *eventCell {
} }
} }
func (cell *eventCell) AddListener(listenerId string, cb eventCallback) {
func (cell *eventCell) AddListener(listenerID string, cb eventCallback) {
cell.mtx.Lock() cell.mtx.Lock()
cell.listeners[listenerId] = cb
cell.listeners[listenerID] = cb
cell.mtx.Unlock() cell.mtx.Unlock()
} }
func (cell *eventCell) RemoveListener(listenerId string) int {
func (cell *eventCell) RemoveListener(listenerID string) int {
cell.mtx.Lock() cell.mtx.Lock()
delete(cell.listeners, listenerId)
delete(cell.listeners, listenerID)
numListeners := len(cell.listeners) numListeners := len(cell.listeners)
cell.mtx.Unlock() cell.mtx.Unlock()
return numListeners return numListeners


+ 2
- 2
mempool/reactor.go View File

@ -38,7 +38,7 @@ func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{ return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{
Id: MempoolChannel,
ID: MempoolChannel,
Priority: 5, Priority: 5,
}, },
} }
@ -53,7 +53,7 @@ func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
} }
// Implements Reactor // Implements Reactor
func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes) _, msg, err := DecodeMessage(msgBytes)
if err != nil { if err != nil {
log.Warn("Error decoding message", "error", err) log.Warn("Error decoding message", "error", err)


+ 9
- 9
p2p/README.md View File

@ -16,15 +16,15 @@ initialization of the connection.
There are two methods for sending messages: There are two methods for sending messages:
```go ```go
func (m MConnection) Send(chId byte, msg interface{}) bool {}
func (m MConnection) TrySend(chId byte, msg interface{}) bool {}
func (m MConnection) Send(chID byte, msg interface{}) bool {}
func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
``` ```
`Send(chId, msg)` is a blocking call that waits until `msg` is successfully queued
for the channel with the given id byte `chId`. The message `msg` is serialized
`Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
for the channel with the given id byte `chID`. The message `msg` is serialized
using the `tendermint/wire` submodule's `WriteBinary()` reflection routine. using the `tendermint/wire` submodule's `WriteBinary()` reflection routine.
`TrySend(chId, msg)` is a nonblocking call that returns false if the channel's
`TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
queue is full. queue is full.
`Send()` and `TrySend()` are also exposed for each `Peer`. `Send()` and `TrySend()` are also exposed for each `Peer`.
@ -37,14 +37,14 @@ or more `Channels`. So while sending outgoing messages is typically performed o
incoming messages are received on the reactor. incoming messages are received on the reactor.
```go ```go
// Declare a MyReactor reactor that handles messages on MyChannelId.
// Declare a MyReactor reactor that handles messages on MyChannelID.
type MyReactor struct{} type MyReactor struct{}
func (reactor MyReactor) GetChannels() []*ChannelDescriptor { func (reactor MyReactor) GetChannels() []*ChannelDescriptor {
return []*ChannelDescriptor{ChannelDescriptor{Id:MyChannelId, Priority: 1}}
return []*ChannelDescriptor{ChannelDescriptor{ID:MyChannelID, Priority: 1}}
} }
func (reactor MyReactor) Receive(chId byte, peer *Peer, msgBytes []byte) {
func (reactor MyReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
r, n, err := bytes.NewBuffer(msgBytes), new(int64), new(error) r, n, err := bytes.NewBuffer(msgBytes), new(int64), new(error)
msgString := ReadString(r, n, err) msgString := ReadString(r, n, err)
fmt.Println(msgString) fmt.Println(msgString)
@ -60,7 +60,7 @@ switch := NewSwitch([]Reactor{MyReactor{}})
// Send a random message to all outbound connections // Send a random message to all outbound connections
for _, peer := range switch.Peers().List() { for _, peer := range switch.Peers().List() {
if peer.IsOutbound() { if peer.IsOutbound() {
peer.Send(MyChannelId, "Here's a random message")
peer.Send(MyChannelID, "Here's a random message")
} }
} }
``` ```


+ 29
- 29
p2p/connection.go View File

@ -22,15 +22,15 @@ const (
idleTimeoutMinutes = 5 idleTimeoutMinutes = 5
updateStatsSeconds = 2 updateStatsSeconds = 2
pingTimeoutSeconds = 40 pingTimeoutSeconds = 40
defaultSendRate = 10240 // 10Kb/s
defaultRecvRate = 10240 // 10Kb/s
defaultSendRate = 51200 // 50Kb/s
defaultRecvRate = 51200 // 50Kb/s
flushThrottleMS = 100 flushThrottleMS = 100
defaultSendQueueCapacity = 1 defaultSendQueueCapacity = 1
defaultRecvBufferCapacity = 4096 defaultRecvBufferCapacity = 4096
defaultSendTimeoutSeconds = 10 defaultSendTimeoutSeconds = 10
) )
type receiveCbFunc func(chId byte, msgBytes []byte)
type receiveCbFunc func(chID byte, msgBytes []byte)
type errorCbFunc func(interface{}) type errorCbFunc func(interface{})
/* /*
@ -45,15 +45,15 @@ The byte id and the relative priorities of each `Channel` are configured upon
initialization of the connection. initialization of the connection.
There are two methods for sending messages: There are two methods for sending messages:
func (m MConnection) Send(chId byte, msg interface{}) bool {}
func (m MConnection) TrySend(chId byte, msg interface{}) bool {}
func (m MConnection) Send(chID byte, msg interface{}) bool {}
func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
`Send(chId, msg)` is a blocking call that waits until `msg` is successfully queued
for the channel with the given id byte `chId`, or until the request times out.
`Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
for the channel with the given id byte `chID`, or until the request times out.
The message `msg` is serialized using the `tendermint/wire` submodule's The message `msg` is serialized using the `tendermint/wire` submodule's
`WriteBinary()` reflection routine. `WriteBinary()` reflection routine.
`TrySend(chId, msg)` is a nonblocking call that returns false if the channel's
`TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
queue is full. queue is full.
Inbound message bytes are handled with an onReceive callback function. Inbound message bytes are handled with an onReceive callback function.
@ -185,17 +185,17 @@ func (c *MConnection) stopForError(r interface{}) {
} }
// Queues a message to be sent to channel. // Queues a message to be sent to channel.
func (c *MConnection) Send(chId byte, msg interface{}) bool {
func (c *MConnection) Send(chID byte, msg interface{}) bool {
if !c.IsRunning() { if !c.IsRunning() {
return false return false
} }
log.Info("Send", "channel", chId, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
log.Info("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
// Send message to channel. // Send message to channel.
channel, ok := c.channelsIdx[chId]
channel, ok := c.channelsIdx[chID]
if !ok { if !ok {
log.Error(Fmt("Cannot send bytes, unknown channel %X", chId))
log.Error(Fmt("Cannot send bytes, unknown channel %X", chID))
return false return false
} }
@ -207,24 +207,24 @@ func (c *MConnection) Send(chId byte, msg interface{}) bool {
default: default:
} }
} else { } else {
log.Warn("Send failed", "channel", chId, "conn", c, "msg", msg)
log.Warn("Send failed", "channel", chID, "conn", c, "msg", msg)
} }
return success return success
} }
// Queues a message to be sent to channel. // Queues a message to be sent to channel.
// Nonblocking, returns true if successful. // Nonblocking, returns true if successful.
func (c *MConnection) TrySend(chId byte, msg interface{}) bool {
func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
if !c.IsRunning() { if !c.IsRunning() {
return false return false
} }
log.Info("TrySend", "channel", chId, "conn", c, "msg", msg)
log.Info("TrySend", "channel", chID, "conn", c, "msg", msg)
// Send message to channel. // Send message to channel.
channel, ok := c.channelsIdx[chId]
channel, ok := c.channelsIdx[chID]
if !ok { if !ok {
log.Error(Fmt("Cannot send bytes, unknown channel %X", chId))
log.Error(Fmt("Cannot send bytes, unknown channel %X", chID))
return false return false
} }
@ -240,14 +240,14 @@ func (c *MConnection) TrySend(chId byte, msg interface{}) bool {
return ok return ok
} }
func (c *MConnection) CanSend(chId byte) bool {
func (c *MConnection) CanSend(chID byte) bool {
if !c.IsRunning() { if !c.IsRunning() {
return false return false
} }
channel, ok := c.channelsIdx[chId]
channel, ok := c.channelsIdx[chID]
if !ok { if !ok {
log.Error(Fmt("Unknown channel %X", chId))
log.Error(Fmt("Unknown channel %X", chID))
return false return false
} }
return channel.canSend() return channel.canSend()
@ -421,9 +421,9 @@ FOR_LOOP:
} }
break FOR_LOOP break FOR_LOOP
} }
channel, ok := c.channelsIdx[pkt.ChannelId]
channel, ok := c.channelsIdx[pkt.ChannelID]
if !ok || channel == nil { if !ok || channel == nil {
PanicQ(Fmt("Unknown channel %X", pkt.ChannelId))
PanicQ(Fmt("Unknown channel %X", pkt.ChannelID))
} }
msgBytes, err := channel.recvMsgPacket(pkt) msgBytes, err := channel.recvMsgPacket(pkt)
if err != nil { if err != nil {
@ -434,8 +434,8 @@ FOR_LOOP:
break FOR_LOOP break FOR_LOOP
} }
if msgBytes != nil { if msgBytes != nil {
log.Debug("Received bytes", "chId", pkt.ChannelId, "msgBytes", msgBytes)
c.onReceive(pkt.ChannelId, msgBytes)
log.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
c.onReceive(pkt.ChannelID, msgBytes)
} }
default: default:
PanicSanity(Fmt("Unknown message type %X", pktType)) PanicSanity(Fmt("Unknown message type %X", pktType))
@ -456,7 +456,7 @@ FOR_LOOP:
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
type ChannelDescriptor struct { type ChannelDescriptor struct {
Id byte
ID byte
Priority int Priority int
SendQueueCapacity int SendQueueCapacity int
RecvBufferCapacity int RecvBufferCapacity int
@ -493,7 +493,7 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
return &Channel{ return &Channel{
conn: conn, conn: conn,
desc: desc, desc: desc,
id: desc.Id,
id: desc.ID,
sendQueue: make(chan []byte, desc.SendQueueCapacity), sendQueue: make(chan []byte, desc.SendQueueCapacity),
recving: make([]byte, 0, desc.RecvBufferCapacity), recving: make([]byte, 0, desc.RecvBufferCapacity),
priority: desc.Priority, priority: desc.Priority,
@ -556,7 +556,7 @@ func (ch *Channel) isSendPending() bool {
// Not goroutine-safe // Not goroutine-safe
func (ch *Channel) nextMsgPacket() msgPacket { func (ch *Channel) nextMsgPacket() msgPacket {
packet := msgPacket{} packet := msgPacket{}
packet.ChannelId = byte(ch.id)
packet.ChannelID = byte(ch.id)
packet.Bytes = ch.sending[:MinInt(maxMsgPacketSize, len(ch.sending))] packet.Bytes = ch.sending[:MinInt(maxMsgPacketSize, len(ch.sending))]
if len(ch.sending) <= maxMsgPacketSize { if len(ch.sending) <= maxMsgPacketSize {
packet.EOF = byte(0x01) packet.EOF = byte(0x01)
@ -617,13 +617,13 @@ const (
// Messages in channels are chopped into smaller msgPackets for multiplexing. // Messages in channels are chopped into smaller msgPackets for multiplexing.
type msgPacket struct { type msgPacket struct {
ChannelId byte
ChannelID byte
EOF byte // 1 means message ends here. EOF byte // 1 means message ends here.
Bytes []byte Bytes []byte
} }
func (p msgPacket) String() string { func (p msgPacket) String() string {
return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelId, p.Bytes, p.EOF)
return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------


+ 10
- 10
p2p/peer.go View File

@ -49,12 +49,12 @@ func peerHandshake(conn net.Conn, ourNodeInfo *types.NodeInfo) (*types.NodeInfo,
// NOTE: call peerHandshake on conn before calling newPeer(). // NOTE: call peerHandshake on conn before calling newPeer().
func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
var p *Peer var p *Peer
onReceive := func(chId byte, msgBytes []byte) {
reactor := reactorsByCh[chId]
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil { if reactor == nil {
PanicSanity(Fmt("Unknown channel %X", chId))
PanicSanity(Fmt("Unknown channel %X", chID))
} }
reactor.Receive(chId, p, msgBytes)
reactor.Receive(chID, p, msgBytes)
} }
onError := func(r interface{}) { onError := func(r interface{}) {
p.Stop() p.Stop()
@ -91,25 +91,25 @@ func (p *Peer) IsOutbound() bool {
return p.outbound return p.outbound
} }
func (p *Peer) Send(chId byte, msg interface{}) bool {
func (p *Peer) Send(chID byte, msg interface{}) bool {
if !p.IsRunning() { if !p.IsRunning() {
return false return false
} }
return p.mconn.Send(chId, msg)
return p.mconn.Send(chID, msg)
} }
func (p *Peer) TrySend(chId byte, msg interface{}) bool {
func (p *Peer) TrySend(chID byte, msg interface{}) bool {
if !p.IsRunning() { if !p.IsRunning() {
return false return false
} }
return p.mconn.TrySend(chId, msg)
return p.mconn.TrySend(chID, msg)
} }
func (p *Peer) CanSend(chId byte) bool {
func (p *Peer) CanSend(chID byte) bool {
if !p.IsRunning() { if !p.IsRunning() {
return false return false
} }
return p.mconn.CanSend(chId)
return p.mconn.CanSend(chID)
} }
func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {


+ 2
- 2
p2p/pex_reactor.go View File

@ -55,7 +55,7 @@ func (pexR *PEXReactor) OnStop() {
func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor {
return []*ChannelDescriptor{ return []*ChannelDescriptor{
&ChannelDescriptor{ &ChannelDescriptor{
Id: PexChannel,
ID: PexChannel,
Priority: 1, Priority: 1,
SendQueueCapacity: 10, SendQueueCapacity: 10,
}, },
@ -84,7 +84,7 @@ func (pexR *PEXReactor) RemovePeer(peer *Peer, reason interface{}) {
// Implements Reactor // Implements Reactor
// Handles incoming PEX messages. // Handles incoming PEX messages.
func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
func (pexR *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
// decode message // decode message
_, msg, err := DecodeMessage(msgBytes) _, msg, err := DecodeMessage(msgBytes)


+ 9
- 9
p2p/switch.go View File

@ -20,7 +20,7 @@ type Reactor interface {
GetChannels() []*ChannelDescriptor GetChannels() []*ChannelDescriptor
AddPeer(peer *Peer) AddPeer(peer *Peer)
RemovePeer(peer *Peer, reason interface{}) RemovePeer(peer *Peer, reason interface{})
Receive(chId byte, peer *Peer, msgBytes []byte)
Receive(chID byte, peer *Peer, msgBytes []byte)
} }
//-------------------------------------- //--------------------------------------
@ -43,7 +43,7 @@ func (br *BaseReactor) SetSwitch(sw *Switch) {
func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil } func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
func (_ *BaseReactor) AddPeer(peer *Peer) {} func (_ *BaseReactor) AddPeer(peer *Peer) {}
func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {} func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
func (_ *BaseReactor) Receive(chId byte, peer *Peer, msgBytes []byte) {}
func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@ -96,12 +96,12 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
// No two reactors can share the same channel. // No two reactors can share the same channel.
reactorChannels := reactor.GetChannels() reactorChannels := reactor.GetChannels()
for _, chDesc := range reactorChannels { for _, chDesc := range reactorChannels {
chId := chDesc.Id
if sw.reactorsByCh[chId] != nil {
PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, sw.reactorsByCh[chId], reactor))
chID := chDesc.ID
if sw.reactorsByCh[chID] != nil {
PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
} }
sw.chDescs = append(sw.chDescs, chDesc) sw.chDescs = append(sw.chDescs, chDesc)
sw.reactorsByCh[chId] = reactor
sw.reactorsByCh[chID] = reactor
} }
sw.reactors[name] = reactor sw.reactors[name] = reactor
reactor.SetSwitch(sw) reactor.SetSwitch(sw)
@ -285,12 +285,12 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool {
// Broadcast runs a go routine for each attempted send, which will block // Broadcast runs a go routine for each attempted send, which will block
// trying to send for defaultSendTimeoutSeconds. Returns a channel // trying to send for defaultSendTimeoutSeconds. Returns a channel
// which receives success values for each attempted send (false if times out) // which receives success values for each attempted send (false if times out)
func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool {
func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
successChan := make(chan bool, len(sw.peers.List())) successChan := make(chan bool, len(sw.peers.List()))
log.Info("Broadcast", "channel", chId, "msg", msg)
log.Info("Broadcast", "channel", chID, "msg", msg)
for _, peer := range sw.peers.List() { for _, peer := range sw.peers.List() {
go func(peer *Peer) { go func(peer *Peer) {
success := peer.Send(chId, msg)
success := peer.Send(chID, msg)
successChan <- success successChan <- success
}(peer) }(peer)
} }


+ 13
- 13
p2p/switch_test.go View File

@ -57,12 +57,12 @@ func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
tr.peersRemoved = append(tr.peersRemoved, peer) tr.peersRemoved = append(tr.peersRemoved, peer)
} }
func (tr *TestReactor) Receive(chId byte, peer *Peer, msgBytes []byte) {
func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
if tr.logMessages { if tr.logMessages {
tr.mtx.Lock() tr.mtx.Lock()
defer tr.mtx.Unlock() defer tr.mtx.Unlock()
//fmt.Printf("Received: %X, %X\n", chId, msgBytes)
tr.msgsReceived[chId] = append(tr.msgsReceived[chId], PeerMessage{peer.Key, msgBytes, tr.msgsCounter})
//fmt.Printf("Received: %X, %X\n", chID, msgBytes)
tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key, msgBytes, tr.msgsCounter})
tr.msgsCounter++ tr.msgsCounter++
} }
} }
@ -129,12 +129,12 @@ func TestSwitches(t *testing.T) {
s1, s2 := makeSwitchPair(t, func(sw *Switch) *Switch { s1, s2 := makeSwitchPair(t, func(sw *Switch) *Switch {
// Make two reactors of two channels each // Make two reactors of two channels each
sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
&ChannelDescriptor{Id: byte(0x00), Priority: 10},
&ChannelDescriptor{Id: byte(0x01), Priority: 10},
&ChannelDescriptor{ID: byte(0x00), Priority: 10},
&ChannelDescriptor{ID: byte(0x01), Priority: 10},
}, true)) }, true))
sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
&ChannelDescriptor{Id: byte(0x02), Priority: 10},
&ChannelDescriptor{Id: byte(0x03), Priority: 10},
&ChannelDescriptor{ID: byte(0x02), Priority: 10},
&ChannelDescriptor{ID: byte(0x03), Priority: 10},
}, true)) }, true))
return sw return sw
}) })
@ -196,12 +196,12 @@ func BenchmarkSwitches(b *testing.B) {
s1, s2 := makeSwitchPair(b, func(sw *Switch) *Switch { s1, s2 := makeSwitchPair(b, func(sw *Switch) *Switch {
// Make bar reactors of bar channels each // Make bar reactors of bar channels each
sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
&ChannelDescriptor{Id: byte(0x00), Priority: 10},
&ChannelDescriptor{Id: byte(0x01), Priority: 10},
&ChannelDescriptor{ID: byte(0x00), Priority: 10},
&ChannelDescriptor{ID: byte(0x01), Priority: 10},
}, false)) }, false))
sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
&ChannelDescriptor{Id: byte(0x02), Priority: 10},
&ChannelDescriptor{Id: byte(0x03), Priority: 10},
&ChannelDescriptor{ID: byte(0x02), Priority: 10},
&ChannelDescriptor{ID: byte(0x03), Priority: 10},
}, false)) }, false))
return sw return sw
}) })
@ -216,8 +216,8 @@ func BenchmarkSwitches(b *testing.B) {
// Send random message from foo channel to another // Send random message from foo channel to another
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
chId := byte(i % 4)
successChan := s1.Broadcast(chId, "test data")
chID := byte(i % 4)
successChan := s1.Broadcast(chID, "test data")
for s := range successChan { for s := range successChan {
if s { if s {
numSuccess += 1 numSuccess += 1


+ 1
- 1
rpc/client/client.go View File

@ -18,7 +18,7 @@ func Call(remote string, method string, params []interface{}, dest interface{})
JSONRPC: "2.0", JSONRPC: "2.0",
Method: method, Method: method,
Params: params, Params: params,
Id: "",
ID: "",
} }
requestBytes := wire.JSONBytes(request) requestBytes := wire.JSONBytes(request)
requestBuf := bytes.NewBuffer(requestBytes) requestBuf := bytes.NewBuffer(requestBytes)


+ 1
- 1
rpc/core/types/responses.go View File

@ -123,7 +123,7 @@ type ResultEvent struct {
type Response struct { type Response struct {
JSONRPC string `json:"jsonrpc"` JSONRPC string `json:"jsonrpc"`
Id string `json:"id"`
ID string `json:"id"`
Result Result `json:"result"` Result Result `json:"result"`
Error string `json:"error"` Error string `json:"error"`
} }


+ 1
- 1
rpc/core_client/client.go View File

@ -187,7 +187,7 @@ fmt
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["{{name}}"], Method: reverseFuncMap["{{name}}"],
Params: []interface{}{ {{args.ident}} }, Params: []interface{}{ {{args.ident}} },
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil{ if err != nil{


+ 19
- 19
rpc/core_client/client_methods.go View File

@ -571,7 +571,7 @@ func (c *ClientJSON) BlockchainInfo(minHeight int, maxHeight int) (*ctypes.Resul
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["BlockchainInfo"], Method: reverseFuncMap["BlockchainInfo"],
Params: []interface{}{minHeight, maxHeight}, Params: []interface{}{minHeight, maxHeight},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -596,7 +596,7 @@ func (c *ClientJSON) BroadcastTx(tx types.Tx) (*ctypes.ResultBroadcastTx, error)
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["BroadcastTx"], Method: reverseFuncMap["BroadcastTx"],
Params: []interface{}{tx}, Params: []interface{}{tx},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -621,7 +621,7 @@ func (c *ClientJSON) Call(fromAddress []byte, toAddress []byte, data []byte) (*c
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["Call"], Method: reverseFuncMap["Call"],
Params: []interface{}{fromAddress, toAddress, data}, Params: []interface{}{fromAddress, toAddress, data},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -646,7 +646,7 @@ func (c *ClientJSON) CallCode(fromAddress []byte, code []byte, data []byte) (*ct
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["CallCode"], Method: reverseFuncMap["CallCode"],
Params: []interface{}{fromAddress, code, data}, Params: []interface{}{fromAddress, code, data},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -671,7 +671,7 @@ func (c *ClientJSON) DumpConsensusState() (*ctypes.ResultDumpConsensusState, err
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["DumpConsensusState"], Method: reverseFuncMap["DumpConsensusState"],
Params: []interface{}{}, Params: []interface{}{},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -696,7 +696,7 @@ func (c *ClientJSON) DumpStorage(address []byte) (*ctypes.ResultDumpStorage, err
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["DumpStorage"], Method: reverseFuncMap["DumpStorage"],
Params: []interface{}{address}, Params: []interface{}{address},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -721,7 +721,7 @@ func (c *ClientJSON) GenPrivAccount() (*ctypes.ResultGenPrivAccount, error) {
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["GenPrivAccount"], Method: reverseFuncMap["GenPrivAccount"],
Params: []interface{}{}, Params: []interface{}{},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -746,7 +746,7 @@ func (c *ClientJSON) Genesis() (*ctypes.ResultGenesis, error) {
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["Genesis"], Method: reverseFuncMap["Genesis"],
Params: []interface{}{}, Params: []interface{}{},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -771,7 +771,7 @@ func (c *ClientJSON) GetAccount(address []byte) (*ctypes.ResultGetAccount, error
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["GetAccount"], Method: reverseFuncMap["GetAccount"],
Params: []interface{}{address}, Params: []interface{}{address},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -796,7 +796,7 @@ func (c *ClientJSON) GetBlock(height int) (*ctypes.ResultGetBlock, error) {
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["GetBlock"], Method: reverseFuncMap["GetBlock"],
Params: []interface{}{height}, Params: []interface{}{height},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -821,7 +821,7 @@ func (c *ClientJSON) GetName(name string) (*ctypes.ResultGetName, error) {
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["GetName"], Method: reverseFuncMap["GetName"],
Params: []interface{}{name}, Params: []interface{}{name},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -846,7 +846,7 @@ func (c *ClientJSON) GetStorage(address []byte, key []byte) (*ctypes.ResultGetSt
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["GetStorage"], Method: reverseFuncMap["GetStorage"],
Params: []interface{}{address, key}, Params: []interface{}{address, key},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -871,7 +871,7 @@ func (c *ClientJSON) ListAccounts() (*ctypes.ResultListAccounts, error) {
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["ListAccounts"], Method: reverseFuncMap["ListAccounts"],
Params: []interface{}{}, Params: []interface{}{},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -896,7 +896,7 @@ func (c *ClientJSON) ListNames() (*ctypes.ResultListNames, error) {
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["ListNames"], Method: reverseFuncMap["ListNames"],
Params: []interface{}{}, Params: []interface{}{},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -921,7 +921,7 @@ func (c *ClientJSON) ListUnconfirmedTxs() (*ctypes.ResultListUnconfirmedTxs, err
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["ListUnconfirmedTxs"], Method: reverseFuncMap["ListUnconfirmedTxs"],
Params: []interface{}{}, Params: []interface{}{},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -946,7 +946,7 @@ func (c *ClientJSON) ListValidators() (*ctypes.ResultListValidators, error) {
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["ListValidators"], Method: reverseFuncMap["ListValidators"],
Params: []interface{}{}, Params: []interface{}{},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -971,7 +971,7 @@ func (c *ClientJSON) NetInfo() (*ctypes.ResultNetInfo, error) {
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["NetInfo"], Method: reverseFuncMap["NetInfo"],
Params: []interface{}{}, Params: []interface{}{},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -996,7 +996,7 @@ func (c *ClientJSON) SignTx(tx types.Tx, privAccounts []*acm.PrivAccount) (*ctyp
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["SignTx"], Method: reverseFuncMap["SignTx"],
Params: []interface{}{tx, privAccounts}, Params: []interface{}{tx, privAccounts},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {
@ -1021,7 +1021,7 @@ func (c *ClientJSON) Status() (*ctypes.ResultStatus, error) {
JSONRPC: "2.0", JSONRPC: "2.0",
Method: reverseFuncMap["Status"], Method: reverseFuncMap["Status"],
Params: []interface{}{}, Params: []interface{}{},
Id: "",
ID: "",
} }
body, err := c.RequestResponse(request) body, err := c.RequestResponse(request)
if err != nil { if err != nil {


+ 3
- 3
rpc/core_client/ws_client.go View File

@ -76,7 +76,7 @@ func (wsc *WSClient) receiveEventsRoutine() {
wsc.Stop() wsc.Stop()
break break
} }
if strings.HasSuffix(response.Id, "#event") {
if strings.HasSuffix(response.ID, "#event") {
wsc.EventsCh <- *response.Result.(*ctypes.ResultEvent) wsc.EventsCh <- *response.Result.(*ctypes.ResultEvent)
} else { } else {
wsc.ResultsCh <- response.Result wsc.ResultsCh <- response.Result
@ -89,7 +89,7 @@ func (wsc *WSClient) receiveEventsRoutine() {
func (wsc *WSClient) Subscribe(eventid string) error { func (wsc *WSClient) Subscribe(eventid string) error {
err := wsc.WriteJSON(rpctypes.RPCRequest{ err := wsc.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0", JSONRPC: "2.0",
Id: "",
ID: "",
Method: "subscribe", Method: "subscribe",
Params: []interface{}{eventid}, Params: []interface{}{eventid},
}) })
@ -100,7 +100,7 @@ func (wsc *WSClient) Subscribe(eventid string) error {
func (wsc *WSClient) Unsubscribe(eventid string) error { func (wsc *WSClient) Unsubscribe(eventid string) error {
err := wsc.WriteJSON(rpctypes.RPCRequest{ err := wsc.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0", JSONRPC: "2.0",
Id: "",
ID: "",
Method: "unsubscribe", Method: "unsubscribe",
Params: []interface{}{eventid}, Params: []interface{}{eventid},
}) })


+ 17
- 17
rpc/server/handlers.go View File

@ -95,27 +95,27 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
return return
} }
if len(r.URL.Path) > 1 { if len(r.URL.Path) > 1 {
WriteRPCResponse(w, NewRPCResponse(request.Id, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
WriteRPCResponse(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
return return
} }
rpcFunc := funcMap[request.Method] rpcFunc := funcMap[request.Method]
if rpcFunc == nil { if rpcFunc == nil {
WriteRPCResponse(w, NewRPCResponse(request.Id, nil, "RPC method unknown: "+request.Method))
WriteRPCResponse(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
return return
} }
args, err := jsonParamsToArgs(rpcFunc, request.Params) args, err := jsonParamsToArgs(rpcFunc, request.Params)
if err != nil { if err != nil {
WriteRPCResponse(w, NewRPCResponse(request.Id, nil, err.Error()))
WriteRPCResponse(w, NewRPCResponse(request.ID, nil, err.Error()))
return return
} }
returns := rpcFunc.f.Call(args) returns := rpcFunc.f.Call(args)
log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
result, err := unreflectResult(returns) result, err := unreflectResult(returns)
if err != nil { if err != nil {
WriteRPCResponse(w, NewRPCResponse(request.Id, nil, err.Error()))
WriteRPCResponse(w, NewRPCResponse(request.ID, nil, err.Error()))
return return
} }
WriteRPCResponse(w, NewRPCResponse(request.Id, result, ""))
WriteRPCResponse(w, NewRPCResponse(request.ID, result, ""))
} }
} }
@ -324,23 +324,23 @@ func (wsc *WSConnection) readRoutine() {
err = json.Unmarshal(in, &request) err = json.Unmarshal(in, &request)
if err != nil { if err != nil {
errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, errStr))
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, errStr))
continue continue
} }
switch request.Method { switch request.Method {
case "subscribe": case "subscribe":
if len(request.Params) != 1 { if len(request.Params) != 1 {
wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "subscribe takes 1 event parameter string"))
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "subscribe takes 1 event parameter string"))
continue continue
} }
if event, ok := request.Params[0].(string); !ok { if event, ok := request.Params[0].(string); !ok {
wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "subscribe takes 1 event parameter string"))
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "subscribe takes 1 event parameter string"))
continue continue
} else { } else {
log.Notice("Subscribe to event", "id", wsc.id, "event", event) log.Notice("Subscribe to event", "id", wsc.id, "event", event)
wsc.evsw.AddListenerForEvent(wsc.id, event, func(msg types.EventData) { wsc.evsw.AddListenerForEvent(wsc.id, event, func(msg types.EventData) {
// NOTE: RPCResponses of subscribed events have id suffix "#event" // NOTE: RPCResponses of subscribed events have id suffix "#event"
wsc.writeRPCResponse(NewRPCResponse(request.Id+"#event", ctypes.ResultEvent{event, msg}, ""))
wsc.writeRPCResponse(NewRPCResponse(request.ID+"#event", ctypes.ResultEvent{event, msg}, ""))
}) })
continue continue
} }
@ -348,41 +348,41 @@ func (wsc *WSConnection) readRoutine() {
if len(request.Params) == 0 { if len(request.Params) == 0 {
log.Notice("Unsubscribe from all events", "id", wsc.id) log.Notice("Unsubscribe from all events", "id", wsc.id)
wsc.evsw.RemoveListener(wsc.id) wsc.evsw.RemoveListener(wsc.id)
wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, ""))
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, ""))
continue continue
} else if len(request.Params) == 1 { } else if len(request.Params) == 1 {
if event, ok := request.Params[0].(string); !ok { if event, ok := request.Params[0].(string); !ok {
wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "unsubscribe takes 0 or 1 event parameter strings"))
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "unsubscribe takes 0 or 1 event parameter strings"))
continue continue
} else { } else {
log.Notice("Unsubscribe from event", "id", wsc.id, "event", event) log.Notice("Unsubscribe from event", "id", wsc.id, "event", event)
wsc.evsw.RemoveListenerForEvent(event, wsc.id) wsc.evsw.RemoveListenerForEvent(event, wsc.id)
wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, ""))
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, ""))
continue continue
} }
} else { } else {
wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "unsubscribe takes 0 or 1 event parameter strings"))
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "unsubscribe takes 0 or 1 event parameter strings"))
continue continue
} }
default: default:
rpcFunc := wsc.funcMap[request.Method] rpcFunc := wsc.funcMap[request.Method]
if rpcFunc == nil { if rpcFunc == nil {
wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "RPC method unknown: "+request.Method))
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
continue continue
} }
args, err := jsonParamsToArgs(rpcFunc, request.Params) args, err := jsonParamsToArgs(rpcFunc, request.Params)
if err != nil { if err != nil {
wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, err.Error()))
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
continue continue
} }
returns := rpcFunc.f.Call(args) returns := rpcFunc.f.Call(args)
log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns) log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns)
result, err := unreflectResult(returns) result, err := unreflectResult(returns)
if err != nil { if err != nil {
wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, err.Error()))
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
continue continue
} else { } else {
wsc.writeRPCResponse(NewRPCResponse(request.Id, result, ""))
wsc.writeRPCResponse(NewRPCResponse(request.ID, result, ""))
continue continue
} }
} }


+ 2
- 2
rpc/test/ws_helpers.go View File

@ -34,7 +34,7 @@ func newWSCon(t *testing.T) *websocket.Conn {
func subscribe(t *testing.T, con *websocket.Conn, eventid string) { func subscribe(t *testing.T, con *websocket.Conn, eventid string) {
err := con.WriteJSON(rpctypes.RPCRequest{ err := con.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0", JSONRPC: "2.0",
Id: "",
ID: "",
Method: "subscribe", Method: "subscribe",
Params: []interface{}{eventid}, Params: []interface{}{eventid},
}) })
@ -47,7 +47,7 @@ func subscribe(t *testing.T, con *websocket.Conn, eventid string) {
func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) { func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) {
err := con.WriteJSON(rpctypes.RPCRequest{ err := con.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0", JSONRPC: "2.0",
Id: "",
ID: "",
Method: "unsubscribe", Method: "unsubscribe",
Params: []interface{}{eventid}, Params: []interface{}{eventid},
}) })


+ 3
- 3
rpc/types/types.go View File

@ -2,14 +2,14 @@ package rpctypes
type RPCRequest struct { type RPCRequest struct {
JSONRPC string `json:"jsonrpc"` JSONRPC string `json:"jsonrpc"`
Id string `json:"id"`
ID string `json:"id"`
Method string `json:"method"` Method string `json:"method"`
Params []interface{} `json:"params"` Params []interface{} `json:"params"`
} }
type RPCResponse struct { type RPCResponse struct {
JSONRPC string `json:"jsonrpc"` JSONRPC string `json:"jsonrpc"`
Id string `json:"id"`
ID string `json:"id"`
Result interface{} `json:"result"` Result interface{} `json:"result"`
Error string `json:"error"` Error string `json:"error"`
} }
@ -17,7 +17,7 @@ type RPCResponse struct {
func NewRPCResponse(id string, res interface{}, err string) RPCResponse { func NewRPCResponse(id string, res interface{}, err string) RPCResponse {
return RPCResponse{ return RPCResponse{
JSONRPC: "2.0", JSONRPC: "2.0",
Id: id,
ID: id,
Result: res, Result: res,
Error: err, Error: err,
} }


+ 2
- 2
vm/test/log_event_test.go View File

@ -40,11 +40,11 @@ func TestLog4(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("Failed to start eventSwitch: %v", err) t.Errorf("Failed to start eventSwitch: %v", err)
} }
eventId := types.EventStringLogEvent(account2.Address.Postfix(20))
eventID := types.EventStringLogEvent(account2.Address.Postfix(20))
doneChan := make(chan struct{}, 1) doneChan := make(chan struct{}, 1)
eventSwitch.AddListenerForEvent("test", eventId, func(event types.EventData) {
eventSwitch.AddListenerForEvent("test", eventID, func(event types.EventData) {
logEvent := event.(types.EventDataLog) logEvent := event.(types.EventDataLog)
// No need to test address as this event would not happen if it wasn't correct // No need to test address as this event would not happen if it wasn't correct
if !reflect.DeepEqual(logEvent.Topics, expectedTopics) { if !reflect.DeepEqual(logEvent.Topics, expectedTopics) {


+ 3
- 3
vm/vm.go View File

@ -692,15 +692,15 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value int64, gas
return nil, firstErr(err, ErrMemoryOutOfBounds) return nil, firstErr(err, ErrMemoryOutOfBounds)
} }
if vm.evc != nil { if vm.evc != nil {
eventId := types.EventStringLogEvent(callee.Address.Postfix(20))
fmt.Printf("eventId: %s\n", eventId)
eventID := types.EventStringLogEvent(callee.Address.Postfix(20))
fmt.Printf("eventID: %s\n", eventID)
log := types.EventDataLog{ log := types.EventDataLog{
callee.Address, callee.Address,
topics, topics,
data, data,
vm.params.BlockHeight, vm.params.BlockHeight,
} }
vm.evc.FireEvent(eventId, log)
vm.evc.FireEvent(eventID, log)
} }
dbg.Printf(" => T:%X D:%X\n", topics, data) dbg.Printf(" => T:%X D:%X\n", topics, data)


Loading…
Cancel
Save