Browse Source

Merge remote-tracking branch 'origin/develop' into consensus_refactor

pull/169/head
Jae Kwon 9 years ago
parent
commit
855cb0f906
27 changed files with 1957 additions and 95 deletions
  1. +110
    -0
      benchmarks/codec_test.go
  2. +2
    -7
      benchmarks/map_test.go
  3. +2
    -0
      benchmarks/proto/README
  4. +1503
    -0
      benchmarks/proto/test.pb.go
  5. +25
    -0
      benchmarks/proto/test.proto
  6. +56
    -0
      benchmarks/simu/counter.go
  7. +20
    -18
      blockchain/reactor.go
  8. +1
    -1
      consensus/common_test.go
  9. +0
    -1
      consensus/height_vote_set.go
  10. +10
    -1
      consensus/log.go
  11. +14
    -12
      consensus/reactor.go
  12. +25
    -2
      mempool/mempool.go
  13. +4
    -5
      mempool/mempool_test.go
  14. +1
    -1
      mempool/reactor.go
  15. +0
    -14
      node/node.go
  16. +7
    -0
      proxy/log.go
  17. +8
    -4
      proxy/remote_app_context.go
  18. +4
    -4
      rpc/client/http_client.go
  19. +0
    -1
      rpc/client/log.go
  20. +126
    -0
      rpc/client/ws_client.go
  21. +8
    -6
      rpc/server/handlers.go
  22. +9
    -0
      rpc/types/types.go
  23. +1
    -4
      state/execution.go
  24. +5
    -1
      types/block.go
  25. +14
    -11
      types/part_set.go
  26. +1
    -1
      types/part_set_test.go
  27. +1
    -1
      types/validator.go

+ 110
- 0
benchmarks/codec_test.go View File

@ -0,0 +1,110 @@
package benchmarks
import (
"testing"
"github.com/tendermint/go-crypto"
"github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire"
proto "github.com/tendermint/tendermint/benchmarks/proto"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
)
func BenchmarkEncodeStatusWire(b *testing.B) {
b.StopTimer()
pubKey := crypto.GenPrivKeyEd25519().PubKey().(crypto.PubKeyEd25519)
status := &ctypes.ResultStatus{
NodeInfo: &p2p.NodeInfo{
PubKey: pubKey,
Moniker: "SOMENAME",
Network: "SOMENAME",
RemoteAddr: "SOMEADDR",
ListenAddr: "SOMEADDR",
Version: "SOMEVER",
Other: []string{"SOMESTRING", "OTHERSTRING"},
},
PubKey: pubKey,
LatestBlockHash: []byte("SOMEBYTES"),
LatestBlockHeight: 123,
LatestBlockTime: 1234,
}
b.StartTimer()
counter := 0
for i := 0; i < b.N; i++ {
jsonBytes := wire.JSONBytes(status)
counter += len(jsonBytes)
}
}
func BenchmarkEncodeNodeInfoWire(b *testing.B) {
b.StopTimer()
pubKey := crypto.GenPrivKeyEd25519().PubKey().(crypto.PubKeyEd25519)
nodeInfo := &p2p.NodeInfo{
PubKey: pubKey,
Moniker: "SOMENAME",
Network: "SOMENAME",
RemoteAddr: "SOMEADDR",
ListenAddr: "SOMEADDR",
Version: "SOMEVER",
Other: []string{"SOMESTRING", "OTHERSTRING"},
}
b.StartTimer()
counter := 0
for i := 0; i < b.N; i++ {
jsonBytes := wire.JSONBytes(nodeInfo)
counter += len(jsonBytes)
}
}
func BenchmarkEncodeNodeInfoBinary(b *testing.B) {
b.StopTimer()
pubKey := crypto.GenPrivKeyEd25519().PubKey().(crypto.PubKeyEd25519)
nodeInfo := &p2p.NodeInfo{
PubKey: pubKey,
Moniker: "SOMENAME",
Network: "SOMENAME",
RemoteAddr: "SOMEADDR",
ListenAddr: "SOMEADDR",
Version: "SOMEVER",
Other: []string{"SOMESTRING", "OTHERSTRING"},
}
b.StartTimer()
counter := 0
for i := 0; i < b.N; i++ {
jsonBytes := wire.BinaryBytes(nodeInfo)
counter += len(jsonBytes)
}
}
func BenchmarkEncodeNodeInfoProto(b *testing.B) {
b.StopTimer()
pubKey := crypto.GenPrivKeyEd25519().PubKey().(crypto.PubKeyEd25519)
pubKey2 := &proto.PubKey{Ed25519: &proto.PubKeyEd25519{Bytes: pubKey[:]}}
nodeInfo := &proto.NodeInfo{
PubKey: pubKey2,
Moniker: "SOMENAME",
Network: "SOMENAME",
RemoteAddr: "SOMEADDR",
ListenAddr: "SOMEADDR",
Version: "SOMEVER",
Other: []string{"SOMESTRING", "OTHERSTRING"},
}
b.StartTimer()
counter := 0
for i := 0; i < b.N; i++ {
bytes, err := nodeInfo.Marshal()
if err != nil {
b.Fatal(err)
return
}
//jsonBytes := wire.JSONBytes(nodeInfo)
counter += len(bytes)
}
}

+ 2
- 7
benchmarks/map_test.go View File

@ -11,11 +11,11 @@ func BenchmarkSomething(b *testing.B) {
numChecks := 100000
keys := make([]string, numItems)
for i := 0; i < numItems; i++ {
keys[i] = RandStr(32)
keys[i] = RandStr(100)
}
txs := make([]string, numChecks)
for i := 0; i < numChecks; i++ {
txs[i] = RandStr(32)
txs[i] = RandStr(100)
}
b.StartTimer()
@ -30,10 +30,5 @@ func BenchmarkSomething(b *testing.B) {
counter++
}
}
for _, tx := range txs {
if _, ok := foo[tx]; ok {
counter++
}
}
}
}

+ 2
- 0
benchmarks/proto/README View File

@ -0,0 +1,2 @@
Doing some protobuf tests here.
Using gogoprotobuf.

+ 1503
- 0
benchmarks/proto/test.pb.go
File diff suppressed because it is too large
View File


+ 25
- 0
benchmarks/proto/test.proto View File

@ -0,0 +1,25 @@
message ResultStatus {
optional NodeInfo nodeInfo = 1;
required PubKey pubKey = 2;
required bytes latestBlockHash = 3;
required int64 latestBlockHeight = 4;
required int64 latestBlocktime = 5;
}
message NodeInfo {
required PubKey pubKey = 1;
required string moniker = 2;
required string network = 3;
required string remoteAddr = 4;
required string listenAddr = 5;
required string version = 6;
repeated string other = 7;
}
message PubKey {
optional PubKeyEd25519 ed25519 = 1;
}
message PubKeyEd25519 {
required bytes bytes = 1;
}

+ 56
- 0
benchmarks/simu/counter.go View File

@ -0,0 +1,56 @@
package main
import (
"encoding/binary"
"time"
//"encoding/hex"
"fmt"
"github.com/gorilla/websocket"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/rpc/client"
// ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/rpc/types"
)
func main() {
ws := rpcclient.NewWSClient("ws://127.0.0.1:46657/websocket")
// ws := rpcclient.NewWSClient("ws://104.236.69.128:46657/websocket")
_, err := ws.Start()
if err != nil {
Exit(err.Error())
}
// Read a bunch of responses
go func() {
for {
_, ok := <-ws.ResultsCh
if !ok {
break
}
//fmt.Println("Received response", string(wire.JSONBytes(res)))
}
}()
// Make a bunch of requests
buf := make([]byte, 32)
for i := 0; ; i++ {
binary.LittleEndian.PutUint64(buf, uint64(i))
//txBytes := hex.EncodeToString(buf[:n])
request := rpctypes.NewRPCRequest("fakeid", "broadcast_tx", Arr(buf[:8]))
reqBytes := wire.JSONBytes(request)
//fmt.Println("!!", string(reqBytes))
fmt.Print(".")
err := ws.WriteMessage(websocket.TextMessage, reqBytes)
if err != nil {
Exit(err.Error())
}
if i%1000 == 0 {
fmt.Println(i)
}
time.Sleep(time.Microsecond * 250)
}
ws.Stop()
}

+ 20
- 18
blockchain/reactor.go View File

@ -42,22 +42,24 @@ type consensusReactor interface {
type BlockchainReactor struct {
p2p.BaseReactor
sw *p2p.Switch
state *sm.State
proxyAppCtx proxy.AppContext // same as consensus.proxyAppCtx
store *BlockStore
pool *BlockPool
sync bool
requestsCh chan BlockRequest
timeoutsCh chan string
lastBlock *types.Block
sw *p2p.Switch
state *sm.State
proxyAppCtx proxy.AppContext // same as consensus.proxyAppCtx
store *BlockStore
pool *BlockPool
sync bool
requestsCh chan BlockRequest
timeoutsCh chan string
lastBlock *types.Block
evsw events.Fireable
}
func NewBlockchainReactor(state *sm.State, proxyAppCtx proxy.AppContext, store *BlockStore, sync bool) *BlockchainReactor {
if state.LastBlockHeight != store.Height() &&
state.LastBlockHeight != store.Height()-1 { // XXX double check this logic.
if state.LastBlockHeight == store.Height()-1 {
store.height -= 1 // XXX HACK, make this better
}
if state.LastBlockHeight != store.Height() {
PanicSanity(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
}
requestsCh := make(chan BlockRequest, defaultChannelCapacity)
@ -68,13 +70,13 @@ func NewBlockchainReactor(state *sm.State, proxyAppCtx proxy.AppContext, store *
timeoutsCh,
)
bcR := &BlockchainReactor{
state: state,
proxyAppCtx: proxyAppCtx,
store: store,
pool: pool,
sync: sync,
requestsCh: requestsCh,
timeoutsCh: timeoutsCh,
state: state,
proxyAppCtx: proxyAppCtx,
store: store,
pool: pool,
sync: sync,
requestsCh: requestsCh,
timeoutsCh: timeoutsCh,
}
bcR.BaseReactor = *p2p.NewBaseReactor(log, "BlockchainReactor", bcR)
return bcR


+ 1
- 1
consensus/common_test.go View File

@ -285,7 +285,7 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
blockStore := bc.NewBlockStore(blockDB)
// one for mempool, one for consensus
app := example.NewCounterApplication()
app := example.NewCounterApplication(false)
appCMem := app.Open()
appCCon := app.Open()
proxyAppCtxMem := proxy.NewLocalAppContext(appCMem)


+ 0
- 1
consensus/height_vote_set.go View File

@ -134,7 +134,6 @@ func (hvs *HeightVoteSet) POLRound() int {
}
func (hvs *HeightVoteSet) getVoteSet(round int, type_ byte) *types.VoteSet {
log.Debug("getVoteSet(round)", "round", round, "type", type_)
rvs, ok := hvs.roundVoteSets[round]
if !ok {
return nil


+ 10
- 1
consensus/log.go View File

@ -4,4 +4,13 @@ import (
"github.com/tendermint/go-logger"
)
var log = logger.New("module", "consensus")
var log = logger.NewBypass("module", "consensus")
func init() {
log.SetHandler(
logger.LvlFilterHandler(
logger.LvlDebug,
logger.BypassHandler(),
),
)
}

+ 14
- 12
consensus/reactor.go View File

@ -86,14 +86,16 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 100,
},
&p2p.ChannelDescriptor{
ID: DataChannel,
Priority: 5,
SendQueueCapacity: 2,
ID: DataChannel,
Priority: 2,
SendQueueCapacity: 50,
RecvBufferCapacity: 50 * 4096,
},
&p2p.ChannelDescriptor{
ID: VoteChannel,
Priority: 5,
SendQueueCapacity: 40,
ID: VoteChannel,
Priority: 5,
SendQueueCapacity: 100,
RecvBufferCapacity: 100 * 100,
},
}
}
@ -175,7 +177,7 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
conR.conS.peerMsgQueue <- msgInfo{msg, peer.Key}
default:
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
@ -302,7 +304,7 @@ func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
}
func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
log := log.New("peer", peer.Key)
log := log.New("peer", peer)
OUTER_LOOP:
for {
@ -408,7 +410,7 @@ OUTER_LOOP:
}
func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
log := log.New("peer", peer.Key)
log := log.New("peer", peer)
// Simple hack to throttle logs upon sleep.
var sleeping = 0
@ -430,8 +432,8 @@ OUTER_LOOP:
sleeping = 0
}
log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
"prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
//log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
// "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
// If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height {
@ -738,7 +740,7 @@ func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
}
func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
log := log.New("peer", ps.Peer.Key, "peerRound", ps.Round, "height", height, "round", round)
log := log.New("peer", ps.Peer, "peerRound", ps.Round, "height", height, "round", round)
if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
PanicSanity("Invalid vote type")
}


+ 25
- 2
mempool/mempool.go View File

@ -2,6 +2,7 @@ package mempool
import (
"bytes"
"container/list"
"sync"
"sync/atomic"
@ -36,10 +37,10 @@ Garbage collection of old elements from mempool.txs is handlde via
the DetachPrev() call, which makes old elements not reachable by
peer broadcastTxRoutine() automatically garbage collected.
*/
const cacheSize = 100000
type Mempool struct {
proxyMtx sync.Mutex
proxyAppCtx proxy.AppContext
@ -47,6 +48,11 @@ type Mempool struct {
counter int64 // simple incrementing counter
height int // the last block Update()'d to
expected *clist.CElement // pointer to .txs for next response
// Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp.
cacheMap map[string]struct{}
cacheList *list.List
}
func NewMempool(proxyAppCtx proxy.AppContext) *Mempool {
@ -56,6 +62,9 @@ func NewMempool(proxyAppCtx proxy.AppContext) *Mempool {
counter: 0,
height: 0,
expected: nil,
cacheMap: make(map[string]struct{}, cacheSize),
cacheList: list.New(),
}
proxyAppCtx.SetResponseCallback(mempool.resCb)
return mempool
@ -73,6 +82,20 @@ func (mem *Mempool) AppendTx(tx types.Tx) (err error) {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
// CACHE
if _, exists := mem.cacheMap[string(tx)]; exists {
return nil
}
if mem.cacheList.Len() >= cacheSize {
popped := mem.cacheList.Front()
poppedTx := popped.Value.(types.Tx)
delete(mem.cacheMap, string(poppedTx))
mem.cacheList.Remove(popped)
}
mem.cacheMap[string(tx)] = struct{}{}
mem.cacheList.PushBack(tx)
// END CACHE
if err = mem.proxyAppCtx.Error(); err != nil {
return err
}


+ 4
- 5
mempool/mempool_test.go View File

@ -12,9 +12,8 @@ import (
func TestSerialReap(t *testing.T) {
app := example.NewCounterApplication()
app := example.NewCounterApplication(true)
appCtxMempool := app.Open()
appCtxMempool.SetOption("serial", "on")
proxyAppCtx := proxy.NewLocalAppContext(appCtxMempool)
mempool := NewMempool(proxyAppCtx)
@ -28,7 +27,7 @@ func TestSerialReap(t *testing.T) {
// This will succeed
txBytes := make([]byte, 32)
_ = binary.PutVarint(txBytes, int64(i))
binary.LittleEndian.PutUint64(txBytes, uint64(i))
err := mempool.AppendTx(txBytes)
if err != nil {
t.Fatal("Error after AppendTx: %v", err)
@ -59,7 +58,7 @@ func TestSerialReap(t *testing.T) {
txs := make([]types.Tx, 0)
for i := start; i < end; i++ {
txBytes := make([]byte, 32)
_ = binary.PutVarint(txBytes, int64(i))
binary.LittleEndian.PutUint64(txBytes, uint64(i))
txs = append(txs, txBytes)
}
blockHeader := &types.Header{Height: 0}
@ -75,7 +74,7 @@ func TestSerialReap(t *testing.T) {
// Append some txs.
for i := start; i < end; i++ {
txBytes := make([]byte, 32)
_ = binary.PutVarint(txBytes, int64(i))
binary.LittleEndian.PutUint64(txBytes, uint64(i))
_, retCode := appCtxConsensus.AppendTx(txBytes)
if retCode != tmsp.RetCodeOK {
t.Error("Error committing tx", retCode)


+ 1
- 1
mempool/reactor.go View File

@ -63,7 +63,7 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
log.Warn("Error decoding message", "error", err)
return
}
log.Notice("MempoolReactor received message", "msg", msg)
log.Info("MempoolReactor received message", "msg", msg)
switch msg := msg.(type) {
case *TxMessage:


+ 0
- 14
node/node.go View File

@ -31,9 +31,7 @@ import _ "net/http/pprof"
type Node struct {
sw *p2p.Switch
evsw *events.EventSwitch
book *p2p.AddrBook
blockStore *bc.BlockStore
pexReactor *p2p.PEXReactor
bcReactor *bc.BlockchainReactor
mempoolReactor *mempl.MempoolReactor
consensusState *consensus.ConsensusState
@ -74,10 +72,6 @@ func NewNode() *Node {
Exit(Fmt("Failed to start switch: %v", err))
}
// Make PEXReactor
book := p2p.NewAddrBook(config.GetString("addrbook_file"))
pexReactor := p2p.NewPEXReactor(book)
// Make BlockchainReactor
bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyAppCtxConsensus, blockStore, config.GetBool("fast_sync"))
@ -94,7 +88,6 @@ func NewNode() *Node {
// Make p2p network switch
sw := p2p.NewSwitch()
sw.AddReactor("PEX", pexReactor)
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
@ -114,9 +107,7 @@ func NewNode() *Node {
return &Node{
sw: sw,
evsw: eventSwitch,
book: book,
blockStore: blockStore,
pexReactor: pexReactor,
bcReactor: bcReactor,
mempoolReactor: mempoolReactor,
consensusState: consensusState,
@ -129,7 +120,6 @@ func NewNode() *Node {
// Call Start() after adding the listeners.
func (n *Node) Start() error {
n.book.Start()
n.sw.SetNodeInfo(makeNodeInfo(n.sw, n.privKey))
n.sw.SetNodePrivKey(n.privKey)
_, err := n.sw.Start()
@ -140,7 +130,6 @@ func (n *Node) Stop() {
log.Notice("Stopping Node")
// TODO: gracefully disconnect from peers.
n.sw.Stop()
n.book.Stop()
}
// Add the event switch to reactors, mempool, etc.
@ -156,7 +145,6 @@ func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) {
func (n *Node) AddListener(l p2p.Listener) {
log.Notice(Fmt("Added %v", l))
n.sw.AddListener(l)
n.book.AddOurAddress(l.ExternalAddress())
}
// Dial a list of seeds in random order
@ -179,11 +167,9 @@ func (n *Node) dialSeed(addr *p2p.NetAddress) {
peer, err := n.sw.DialPeerWithAddress(addr)
if err != nil {
log.Error("Error dialing seed", "error", err)
//n.book.MarkAttempt(addr)
return
} else {
log.Notice("Connected to seed", "peer", peer)
n.book.AddAddress(addr, addr)
}
}


+ 7
- 0
proxy/log.go View File

@ -0,0 +1,7 @@
package proxy
import (
"github.com/tendermint/go-logger"
)
var log = logger.New("module", "proxy")

+ 8
- 4
proxy/remote_app_context.go View File

@ -21,7 +21,7 @@ const maxResponseSize = 1048576 // 1MB
// with concurrent callers.
type remoteAppContext struct {
QuitService
sync.Mutex
sync.Mutex // [EB]: is this even used?
reqQueue chan *reqRes
@ -65,7 +65,7 @@ func (app *remoteAppContext) SetResponseCallback(resCb Callback) {
func (app *remoteAppContext) StopForError(err error) {
app.mtx.Lock()
fmt.Println("Stopping remoteAppContext for error:", err)
log.Error("Stopping remoteAppContext for error.", "error", err)
if app.err == nil {
app.err = err
}
@ -89,11 +89,15 @@ func (app *remoteAppContext) sendRequestsRoutine() {
case <-app.QuitService.Quit:
return
case reqres := <-app.reqQueue:
app.willSendReq(reqres)
wire.WriteBinary(reqres.Request, app.bufWriter, &n, &err)
if err != nil {
app.StopForError(err)
return
}
log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
if _, ok := reqres.Request.(tmsp.RequestFlush); ok {
err = app.bufWriter.Flush()
if err != nil {
@ -101,7 +105,6 @@ func (app *remoteAppContext) sendRequestsRoutine() {
return
}
}
app.didSendReq(reqres)
}
}
}
@ -121,6 +124,7 @@ func (app *remoteAppContext) recvResponseRoutine() {
case tmsp.ResponseException:
app.StopForError(errors.New(res.Error))
default:
log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
err := app.didRecvResponse(res)
if err != nil {
app.StopForError(err)
@ -129,7 +133,7 @@ func (app *remoteAppContext) recvResponseRoutine() {
}
}
func (app *remoteAppContext) didSendReq(reqres *reqRes) {
func (app *remoteAppContext) willSendReq(reqres *reqRes) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.reqSent.PushBack(reqres)


rpc/client/client.go → rpc/client/http_client.go View File


+ 0
- 1
rpc/client/log.go View File

@ -1,4 +1,3 @@
package rpcclient
import (


+ 126
- 0
rpc/client/ws_client.go View File

@ -0,0 +1,126 @@
package rpcclient
import (
"net/http"
"strings"
"time"
"github.com/gorilla/websocket"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/rpc/types"
)
const (
wsEventsChannelCapacity = 10
wsResultsChannelCapacity = 10
wsWriteTimeoutSeconds = 10
)
type WSClient struct {
QuitService
Address string
*websocket.Conn
EventsCh chan ctypes.ResultEvent // closes upon WSClient.Stop()
ResultsCh chan ctypes.Result // closes upon WSClient.Stop()
}
// create a new connection
func NewWSClient(addr string) *WSClient {
wsClient := &WSClient{
Address: addr,
Conn: nil,
EventsCh: make(chan ctypes.ResultEvent, wsEventsChannelCapacity),
ResultsCh: make(chan ctypes.Result, wsResultsChannelCapacity),
}
wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient)
return wsClient
}
func (wsc *WSClient) OnStart() error {
wsc.QuitService.OnStart()
err := wsc.dial()
if err != nil {
return err
}
go wsc.receiveEventsRoutine()
return nil
}
func (wsc *WSClient) dial() error {
// Dial
dialer := websocket.DefaultDialer
rHeader := http.Header{}
con, _, err := dialer.Dial(wsc.Address, rHeader)
if err != nil {
return err
}
// Set the ping/pong handlers
con.SetPingHandler(func(m string) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
return nil
})
con.SetPongHandler(func(m string) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
return nil
})
wsc.Conn = con
return nil
}
func (wsc *WSClient) OnStop() {
wsc.QuitService.OnStop()
// EventsCh and ResultsCh are closed in receiveEventsRoutine.
}
func (wsc *WSClient) receiveEventsRoutine() {
for {
_, data, err := wsc.ReadMessage()
if err != nil {
log.Info("WSClient failed to read message", "error", err, "data", string(data))
wsc.Stop()
break
} else {
var response ctypes.Response
wire.ReadJSON(&response, data, &err)
if err != nil {
log.Info("WSClient failed to parse message", "error", err)
wsc.Stop()
break
}
if strings.HasSuffix(response.ID, "#event") {
wsc.EventsCh <- *response.Result.(*ctypes.ResultEvent)
} else {
wsc.ResultsCh <- response.Result
}
}
}
// Cleanup
close(wsc.EventsCh)
close(wsc.ResultsCh)
}
// subscribe to an event
func (wsc *WSClient) Subscribe(eventid string) error {
err := wsc.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "subscribe",
Params: []interface{}{eventid},
})
return err
}
// unsubscribe from an event
func (wsc *WSClient) Unsubscribe(eventid string) error {
err := wsc.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "unsubscribe",
Params: []interface{}{eventid},
})
return err
}

+ 8
- 6
rpc/server/handlers.go View File

@ -252,11 +252,13 @@ func (wsc *WSConnection) OnStart() error {
wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds)
wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds)
wsc.baseConn.SetPingHandler(func(m string) error {
wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
// NOTE: https://github.com/gorilla/websocket/issues/97
go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
return nil
})
wsc.baseConn.SetPongHandler(func(m string) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
return nil
})
@ -287,13 +289,12 @@ func (wsc *WSConnection) readTimeoutRoutine() {
}
}
// Attempt to write response to writeChan and record failures
// Block trying to write to writeChan until service stops.
func (wsc *WSConnection) writeRPCResponse(resp RPCResponse) {
select {
case <-wsc.Quit:
return
case wsc.writeChan <- resp:
default:
log.Notice("Stopping connection due to writeChan overflow", "id", wsc.id)
wsc.Stop() // writeChan capacity exceeded, error.
}
}
@ -412,7 +413,8 @@ func (wsc *WSConnection) writeRoutine() {
log.Error("Failed to marshal RPCResponse to JSON", "error", err)
} else {
wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds))
if err = wsc.baseConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
bufBytes := buf.Bytes()
if err = wsc.baseConn.WriteMessage(websocket.TextMessage, bufBytes); err != nil {
log.Warn("Failed to write response on websocket", "error", err)
wsc.Stop()
return


+ 9
- 0
rpc/types/types.go View File

@ -7,6 +7,15 @@ type RPCRequest struct {
Params []interface{} `json:"params"`
}
func NewRPCRequest(id string, method string, params []interface{}) RPCRequest {
return RPCRequest{
JSONRPC: "2.0",
ID: id,
Method: method,
Params: params,
}
}
type RPCResponse struct {
JSONRPC string `json:"jsonrpc"`
ID string `json:"id"`


+ 1
- 4
state/execution.go View File

@ -31,10 +31,7 @@ func (s *State) ExecBlock(proxyAppCtx proxy.AppContext, block *types.Block, bloc
nextValSet := valSet.Copy()
// First, rollback.
if err != nil {
proxyAppCtx.RollbackSync()
return err
}
proxyAppCtx.RollbackSync()
// Execute, or rollback. (Does not commit)
err = s.execBlockOnProxyApp(proxyAppCtx, block)


+ 5
- 1
types/block.go View File

@ -343,8 +343,12 @@ func (data *Data) StringIndented(indent string) string {
if data == nil {
return "nil-Data"
}
txStrings := make([]string, len(data.Txs))
txStrings := make([]string, MinInt(len(data.Txs), 21))
for i, tx := range data.Txs {
if i == 20 {
txStrings[i] = fmt.Sprintf("... (%v total)", len(data.Txs))
break
}
txStrings[i] = fmt.Sprintf("Tx:%v", tx)
}
return fmt.Sprintf(`Data{


+ 14
- 11
types/part_set.go View File

@ -24,8 +24,9 @@ var (
)
type Part struct {
Proof merkle.SimpleProof `json:"proof"`
Index int `json:"index"`
Bytes []byte `json:"bytes"`
Proof merkle.SimpleProof `json:"proof"`
// Cache
hash []byte
@ -47,12 +48,13 @@ func (part *Part) String() string {
}
func (part *Part) StringIndented(indent string) string {
return fmt.Sprintf(`Part{
return fmt.Sprintf(`Part{#%v
%s Bytes: %X...
%s Proof: %v
%s Bytes: %X
%s}`,
part.Index,
indent, Fingerprint(part.Bytes),
indent, part.Proof.StringIndented(indent+" "),
indent, part.Bytes,
indent)
}
@ -101,6 +103,7 @@ func NewPartSetFromData(data []byte) *PartSet {
partsBitArray := NewBitArray(total)
for i := 0; i < total; i++ {
part := &Part{
Index: i,
Bytes: data[i*partSize : MinInt(len(data), (i+1)*partSize)],
}
parts[i] = part
@ -108,13 +111,13 @@ func NewPartSetFromData(data []byte) *PartSet {
partsBitArray.SetIndex(i, true)
}
// Compute merkle proofs
proofs := merkle.SimpleProofsFromHashables(parts_)
root, proofs := merkle.SimpleProofsFromHashables(parts_)
for i := 0; i < total; i++ {
parts[i].Proof = *proofs[i]
}
return &PartSet{
total: total,
hash: proofs[0].RootHash,
hash: root,
parts: parts,
partsBitArray: partsBitArray,
count: total,
@ -190,23 +193,23 @@ func (ps *PartSet) AddPart(part *Part) (bool, error) {
defer ps.mtx.Unlock()
// Invalid part index
if part.Proof.Index >= ps.total {
if part.Index >= ps.total {
return false, ErrPartSetUnexpectedIndex
}
// If part already exists, return false.
if ps.parts[part.Proof.Index] != nil {
if ps.parts[part.Index] != nil {
return false, nil
}
// Check hash proof
if !part.Proof.Verify(part.Hash(), ps.Hash()) {
if !part.Proof.Verify(part.Index, ps.total, part.Hash(), ps.Hash()) {
return false, ErrPartSetInvalidProof
}
// Add part
ps.parts[part.Proof.Index] = part
ps.partsBitArray.SetIndex(part.Proof.Index, true)
ps.parts[part.Index] = part
ps.partsBitArray.SetIndex(part.Index, true)
ps.count++
return true, nil
}


+ 1
- 1
types/part_set_test.go View File

@ -69,7 +69,7 @@ func TestWrongProof(t *testing.T) {
// Test adding a part with wrong trail.
part := partSet.GetPart(0)
part.Proof.InnerHashes[0][0] += byte(0x01)
part.Proof.Aunts[0][0] += byte(0x01)
added, err := partSet2.AddPart(part)
if added || err == nil {
t.Errorf("Expected to fail adding a part with bad trail.")


+ 1
- 1
types/validator.go View File

@ -53,7 +53,7 @@ func (v *Validator) String() string {
if v == nil {
return "nil-Validator"
}
return fmt.Sprintf("Validator{%X %v %v-%v-%v VP:%v A:%v}",
return fmt.Sprintf("Validator{%X %v %v VP:%v A:%v}",
v.Address,
v.PubKey,
v.LastCommitHeight,


Loading…
Cancel
Save