diff --git a/config/tendermint/config.go b/config/tendermint/config.go index 7e8e5b44c..74bf7b2ff 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -67,6 +67,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") mapConfig.SetDefault("cswal", rootDir+"/data/cswal") + mapConfig.SetDefault("cswal_light", false) mapConfig.SetDefault("block_size", 10000) mapConfig.SetDefault("timeout_propose", 3000) @@ -77,6 +78,8 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("timeout_precommit_delta", 500) mapConfig.SetDefault("timeout_commit", 1000) mapConfig.SetDefault("mempool_recheck", true) + mapConfig.SetDefault("mempool_broadcast", true) + mapConfig.SetDefault("mempool_reap", true) return mapConfig } diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index a9153e50c..1263dc135 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -91,6 +91,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") mapConfig.SetDefault("cswal", rootDir+"/data/cswal") + mapConfig.SetDefault("cswal_light", false) mapConfig.SetDefault("block_size", 10000) mapConfig.SetDefault("timeout_propose", 100) @@ -101,6 +102,8 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("timeout_precommit_delta", 1) mapConfig.SetDefault("timeout_commit", 1) mapConfig.SetDefault("mempool_recheck", true) + mapConfig.SetDefault("mempool_broadcast", true) + mapConfig.SetDefault("mempool_reap", true) return mapConfig } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 2e93450a5..f18cc0ccb 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -63,7 +63,7 @@ func TestReplayCatchup(t *testing.T) { func openWAL(t *testing.T, cs *ConsensusState, file string) { // open the wal - wal, err := NewWAL(file) + wal, err := NewWAL(file, config.GetBool("cswal_light")) if err != nil { t.Fatal(err) } diff --git a/consensus/state.go b/consensus/state.go index 6fdda4087..49be32ea1 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -328,7 +328,7 @@ func (cs *ConsensusState) OnStop() { func (cs *ConsensusState) OpenWAL(file string) (err error) { cs.mtx.Lock() defer cs.mtx.Unlock() - wal, err := NewWAL(file) + wal, err := NewWAL(file, config.GetBool("cswal_light")) if err != nil { return err } @@ -655,7 +655,6 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { err = cs.setProposal(msg.Proposal) case *BlockPartMessage: // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit - // if we're the only validator, the enterPrevote may take us through to the next round _, err = cs.addProposalBlockPart(msg.Height, msg.Part) case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature @@ -675,7 +674,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { log.Warn("Unknown msg type", reflect.TypeOf(msg)) } if err != nil { - log.Error("Error with msg", "type", reflect.TypeOf(msg), "error", err, "msg", msg) + log.Error("Error with msg", "type", reflect.TypeOf(msg), "peer", peerKey, "error", err, "msg", msg) } } diff --git a/consensus/wal.go b/consensus/wal.go index 00536f9dc..832368e6c 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -39,9 +39,11 @@ type WAL struct { exists bool // if the file already existed (restarted process) done chan struct{} + + light bool // ignore block parts } -func NewWAL(file string) (*WAL, error) { +func NewWAL(file string, light bool) (*WAL, error) { var walExists bool if _, err := os.Stat(file); err == nil { walExists = true @@ -54,12 +56,20 @@ func NewWAL(file string) (*WAL, error) { fp: fp, exists: walExists, done: make(chan struct{}), + light: light, }, nil } // called in newStep and for each pass in receiveRoutine func (wal *WAL) Save(msg ConsensusLogMessageInterface) { if wal != nil { + if wal.light { + if m, ok := msg.(msgInfo); ok { + if _, ok := m.Msg.(*BlockPartMessage); ok { + return + } + } + } var n int var err error wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, wal.fp, &n, &err) diff --git a/consensus/wal_test.go b/consensus/wal_test.go index 8348ca24d..808a5e414 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -30,7 +30,7 @@ func TestSeek(t *testing.T) { } f.Close() - wal, err := NewWAL(path.Join(os.TempDir(), name)) + wal, err := NewWAL(path.Join(os.TempDir(), name), config.GetBool("cswal_light")) if err != nil { t.Fatal(err) } diff --git a/mempool/mempool.go b/mempool/mempool.go index 1d0a375d6..a511c2e46 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -184,6 +184,10 @@ func (mem *Mempool) resCbRecheck(req *tmsp.Request, res *tmsp.Response) { // Get the valid transactions remaining func (mem *Mempool) Reap() []types.Tx { + if !config.GetBool("mempool_reap") { + return []types.Tx{} + } + mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() diff --git a/mempool/reactor.go b/mempool/reactor.go index 9a8649fd2..dc4c9fede 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -101,6 +101,10 @@ type Peer interface { // TODO: Handle mempool or reactor shutdown? // As is this routine may block forever if no new txs come in. func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { + if !config.GetBool("mempool_broadcast") { + return + } + var next *clist.CElement for { if !memR.IsRunning() { diff --git a/node/node.go b/node/node.go index 53805e211..8beafcee8 100644 --- a/node/node.go +++ b/node/node.go @@ -58,8 +58,9 @@ func NewNode(privValidator *types.PrivValidator) *Node { proxyAppConnMempool := getProxyApp(proxyAddr, state.AppHash) proxyAppConnConsensus := getProxyApp(proxyAddr, state.AppHash) - // add the chainid to the global config + // add the chainid and number of validators to the global config config.Set("chain_id", state.ChainID) + config.Set("num_vals", state.Validators.Size()) // Generate node PrivKey privKey := crypto.GenPrivKeyEd25519() diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 61bf08a56..8eca1883e 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -39,3 +39,8 @@ func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { txs := mempoolReactor.Mempool.Reap() return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil } + +func TestStartMempool() (*ctypes.ResultTestStartMempool, error) { + config.Set("mempool_reap", true) + return &ctypes.ResultTestStartMempool{}, nil +} diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 23cd56163..1d6eaac17 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -20,6 +20,7 @@ var Routes = map[string]*rpc.RPCFunc{ "broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSyncResult, "tx"), "broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsyncResult, "tx"), "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""), + "test_start_mempool": rpc.NewRPCFunc(TestStartMempoolResult, ""), // move to test server ? // subscribe/unsubscribe are reserved for websocket events. } @@ -126,3 +127,11 @@ func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) { return r, nil } } + +func TestStartMempoolResult() (ctypes.TMResult, error) { + if r, err := TestStartMempool(); err != nil { + return nil, err + } else { + return r, nil + } +} diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index b56d701b9..78e722e38 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -68,6 +68,8 @@ type ResultUnconfirmedTxs struct { Txs []types.Tx `json:"txs"` } +type ResultTestStartMempool struct{} + type ResultSubscribe struct { } @@ -105,6 +107,9 @@ const ( ResultTypeSubscribe = byte(0x80) ResultTypeUnsubscribe = byte(0x81) ResultTypeEvent = byte(0x82) + + // 0xa bytes for testing + ResultTypeTestStartMempool = byte(0xa0) ) type TMResult interface { @@ -127,4 +132,5 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe}, wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, wire.ConcreteType{&ResultEvent{}, ResultTypeEvent}, + wire.ConcreteType{&ResultTestStartMempool{}, ResultTypeTestStartMempool}, )