Browse Source

service#Start, service#Stop signatures were changed

See https://github.com/tendermint/tmlibs/issues/45
pull/807/head
Anton Kaliaev 7 years ago
parent
commit
69b5da766c
No known key found for this signature in database GPG Key ID: 7B6881D965918214
38 changed files with 99 additions and 104 deletions
  1. +1
    -1
      benchmarks/simu/counter.go
  2. +1
    -1
      blockchain/pool.go
  3. +2
    -2
      blockchain/pool_test.go
  4. +1
    -1
      blockchain/reactor.go
  5. +1
    -1
      cmd/tendermint/commands/run_node.go
  6. +1
    -1
      consensus/byzantine_test.go
  7. +4
    -4
      consensus/common_test.go
  8. +2
    -2
      consensus/reactor.go
  9. +1
    -1
      consensus/reactor_test.go
  10. +1
    -2
      consensus/replay.go
  11. +2
    -2
      consensus/replay_file.go
  12. +9
    -9
      consensus/replay_test.go
  13. +3
    -3
      consensus/state.go
  14. +2
    -2
      consensus/ticker.go
  15. +6
    -6
      consensus/wal.go
  16. +1
    -1
      glide.lock
  17. +1
    -1
      glide.yaml
  18. +2
    -2
      mempool/mempool_test.go
  19. +3
    -3
      node/node.go
  20. +1
    -1
      node/node_test.go
  21. +8
    -8
      p2p/connection_test.go
  22. +2
    -2
      p2p/listener.go
  23. +1
    -1
      p2p/peer.go
  24. +3
    -3
      p2p/peer_test.go
  25. +1
    -1
      p2p/pex_reactor.go
  26. +1
    -1
      p2p/pex_reactor_test.go
  27. +3
    -3
      p2p/switch.go
  28. +2
    -2
      p2p/switch_test.go
  29. +6
    -6
      proxy/app_conn_test.go
  30. +3
    -3
      proxy/multi_app_conn.go
  31. +4
    -8
      rpc/client/event_test.go
  32. +5
    -5
      rpc/client/httpclient.go
  33. +7
    -7
      rpc/lib/client/ws_client.go
  34. +1
    -1
      rpc/lib/client/ws_client_test.go
  35. +4
    -4
      rpc/lib/rpc_test.go
  36. +1
    -1
      rpc/lib/server/handlers.go
  37. +1
    -1
      rpc/test/helpers.go
  38. +1
    -1
      state/execution_test.go

+ 1
- 1
benchmarks/simu/counter.go View File

@ -12,7 +12,7 @@ import (
func main() { func main() {
wsc := rpcclient.NewWSClient("127.0.0.1:46657", "/websocket") wsc := rpcclient.NewWSClient("127.0.0.1:46657", "/websocket")
_, err := wsc.Start()
err := wsc.Start()
if err != nil { if err != nil {
cmn.Exit(err.Error()) cmn.Exit(err.Error())
} }


+ 1
- 1
blockchain/pool.go View File

@ -311,7 +311,7 @@ func (pool *BlockPool) makeNextRequester() {
pool.requesters[nextHeight] = request pool.requesters[nextHeight] = request
pool.numPending++ pool.numPending++
_, err := request.Start()
err := request.Start()
if err != nil { if err != nil {
request.Logger.Error("Error starting request", "err", err) request.Logger.Error("Error starting request", "err", err)
} }


+ 2
- 2
blockchain/pool_test.go View File

@ -37,7 +37,7 @@ func TestBasic(t *testing.T) {
pool := NewBlockPool(start, requestsCh, timeoutsCh) pool := NewBlockPool(start, requestsCh, timeoutsCh)
pool.SetLogger(log.TestingLogger()) pool.SetLogger(log.TestingLogger())
_, err := pool.Start()
err := pool.Start()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -93,7 +93,7 @@ func TestTimeout(t *testing.T) {
requestsCh := make(chan BlockRequest, 100) requestsCh := make(chan BlockRequest, 100)
pool := NewBlockPool(start, requestsCh, timeoutsCh) pool := NewBlockPool(start, requestsCh, timeoutsCh)
pool.SetLogger(log.TestingLogger()) pool.SetLogger(log.TestingLogger())
_, err := pool.Start()
err := pool.Start()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }


+ 1
- 1
blockchain/reactor.go View File

@ -92,7 +92,7 @@ func (bcR *BlockchainReactor) OnStart() error {
return err return err
} }
if bcR.fastSync { if bcR.fastSync {
_, err := bcR.pool.Start()
err := bcR.pool.Start()
if err != nil { if err != nil {
return err return err
} }


+ 1
- 1
cmd/tendermint/commands/run_node.go View File

@ -49,7 +49,7 @@ func NewRunNodeCmd(nodeProvider nm.NodeProvider) *cobra.Command {
return fmt.Errorf("Failed to create node: %v", err) return fmt.Errorf("Failed to create node: %v", err)
} }
if _, err := n.Start(); err != nil {
if err := n.Start(); err != nil {
return fmt.Errorf("Failed to start node: %v", err) return fmt.Errorf("Failed to start node: %v", err)
} else { } else {
logger.Info("Started node", "nodeInfo", n.Switch().NodeInfo()) logger.Info("Started node", "nodeInfo", n.Switch().NodeInfo())


+ 1
- 1
consensus/byzantine_test.go View File

@ -58,7 +58,7 @@ func TestByzantine(t *testing.T) {
eventBus := types.NewEventBus() eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events", "validator", i)) eventBus.SetLogger(logger.With("module", "events", "validator", i))
_, err := eventBus.Start()
err := eventBus.Start()
require.NoError(t, err) require.NoError(t, err)
defer eventBus.Stop() defer eventBus.Stop()


+ 4
- 4
consensus/common_test.go View File

@ -464,12 +464,12 @@ type mockTicker struct {
fired bool fired bool
} }
func (m *mockTicker) Start() (bool, error) {
return true, nil
func (m *mockTicker) Start() error {
return nil
} }
func (m *mockTicker) Stop() bool {
return true
func (m *mockTicker) Stop() error {
return nil
} }
func (m *mockTicker) ScheduleTimeout(ti timeoutInfo) { func (m *mockTicker) ScheduleTimeout(ti timeoutInfo) {


+ 2
- 2
consensus/reactor.go View File

@ -65,7 +65,7 @@ func (conR *ConsensusReactor) OnStart() error {
} }
if !conR.FastSync() { if !conR.FastSync() {
_, err := conR.conS.Start()
err := conR.conS.Start()
if err != nil { if err != nil {
return err return err
} }
@ -97,7 +97,7 @@ func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State, blocksSynced in
// dont bother with the WAL if we fast synced // dont bother with the WAL if we fast synced
conR.conS.doWALCatchup = false conR.conS.doWALCatchup = false
} }
_, err := conR.conS.Start()
err := conR.conS.Start()
if err != nil { if err != nil {
conR.Logger.Error("Error starting conS", "err", err) conR.Logger.Error("Error starting conS", "err", err)
} }


+ 1
- 1
consensus/reactor_test.go View File

@ -41,7 +41,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus
eventBuses[i] = types.NewEventBus() eventBuses[i] = types.NewEventBus()
eventBuses[i].SetLogger(thisLogger.With("module", "events", "validator", i)) eventBuses[i].SetLogger(thisLogger.With("module", "events", "validator", i))
_, err := eventBuses[i].Start()
err := eventBuses[i].Start()
require.NoError(t, err) require.NoError(t, err)
reactors[i].SetEventBus(eventBuses[i]) reactors[i].SetEventBus(eventBuses[i])


+ 1
- 2
consensus/replay.go View File

@ -356,7 +356,6 @@ func (h *Handshaker) replayBlock(height int, proxyApp proxy.AppConnConsensus) ([
func (h *Handshaker) checkAppHash(appHash []byte) error { func (h *Handshaker) checkAppHash(appHash []byte) error {
if !bytes.Equal(h.state.AppHash, appHash) { if !bytes.Equal(h.state.AppHash, appHash) {
panic(errors.New(cmn.Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)).Error()) panic(errors.New(cmn.Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)).Error())
return nil
} }
return nil return nil
} }
@ -371,7 +370,7 @@ func newMockProxyApp(appHash []byte, abciResponses *sm.ABCIResponses) proxy.AppC
abciResponses: abciResponses, abciResponses: abciResponses,
}) })
cli, _ := clientCreator.NewABCIClient() cli, _ := clientCreator.NewABCIClient()
_, err := cli.Start()
err := cli.Start()
if err != nil { if err != nil {
panic(err) panic(err)
} }


+ 2
- 2
consensus/replay_file.go View File

@ -292,13 +292,13 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
// Create proxyAppConn connection (consensus, mempool, query) // Create proxyAppConn connection (consensus, mempool, query)
clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()) clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir())
proxyApp := proxy.NewAppConns(clientCreator, NewHandshaker(state, blockStore)) proxyApp := proxy.NewAppConns(clientCreator, NewHandshaker(state, blockStore))
_, err = proxyApp.Start()
err = proxyApp.Start()
if err != nil { if err != nil {
cmn.Exit(cmn.Fmt("Error starting proxy app conns: %v", err)) cmn.Exit(cmn.Fmt("Error starting proxy app conns: %v", err))
} }
eventBus := types.NewEventBus() eventBus := types.NewEventBus()
if _, err := eventBus.Start(); err != nil {
if err := eventBus.Start(); err != nil {
cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err)) cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err))
} }


+ 9
- 9
consensus/replay_test.go View File

@ -70,7 +70,7 @@ func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int, bl
// fmt.Printf("====== WAL: \n\r%s\n", bytes) // fmt.Printf("====== WAL: \n\r%s\n", bytes)
t.Logf("====== WAL: \n\r%s\n", bytes) t.Logf("====== WAL: \n\r%s\n", bytes)
_, err := cs.Start()
err := cs.Start()
require.NoError(t, err) require.NoError(t, err)
defer func() { defer func() {
cs.Stop() cs.Stop()
@ -171,7 +171,7 @@ LOOP:
cs.wal = crashingWal cs.wal = crashingWal
// start consensus state // start consensus state
_, err = cs.Start()
err = cs.Start()
require.NoError(t, err) require.NoError(t, err)
i++ i++
@ -257,9 +257,9 @@ func (w *crashingWAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, f
return w.next.SearchForEndHeight(height) return w.next.SearchForEndHeight(height)
} }
func (w *crashingWAL) Start() (bool, error) { return w.next.Start() }
func (w *crashingWAL) Stop() bool { return w.next.Stop() }
func (w *crashingWAL) Wait() { w.next.Wait() }
func (w *crashingWAL) Start() error { return w.next.Start() }
func (w *crashingWAL) Stop() error { return w.next.Stop() }
func (w *crashingWAL) Wait() { w.next.Wait() }
//------------------------------------------------------------------------------------------ //------------------------------------------------------------------------------------------
// Handshake Tests // Handshake Tests
@ -339,7 +339,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
t.Fatal(err) t.Fatal(err)
} }
wal.SetLogger(log.TestingLogger()) wal.SetLogger(log.TestingLogger())
if _, err := wal.Start(); err != nil {
if err := wal.Start(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
chain, commits, err := makeBlockchainFromWAL(wal) chain, commits, err := makeBlockchainFromWAL(wal)
@ -368,7 +368,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
// now start the app using the handshake - it should sync // now start the app using the handshake - it should sync
handshaker := NewHandshaker(state, store) handshaker := NewHandshaker(state, store)
proxyApp := proxy.NewAppConns(clientCreator2, handshaker) proxyApp := proxy.NewAppConns(clientCreator2, handshaker)
if _, err := proxyApp.Start(); err != nil {
if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err) t.Fatalf("Error starting proxy app connections: %v", err)
} }
@ -406,7 +406,7 @@ func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) {
func buildAppStateFromChain(proxyApp proxy.AppConns, func buildAppStateFromChain(proxyApp proxy.AppConns,
state *sm.State, chain []*types.Block, nBlocks int, mode uint) { state *sm.State, chain []*types.Block, nBlocks int, mode uint) {
// start a new app without handshake, play nBlocks blocks // start a new app without handshake, play nBlocks blocks
if _, err := proxyApp.Start(); err != nil {
if err := proxyApp.Start(); err != nil {
panic(err) panic(err)
} }
@ -441,7 +441,7 @@ func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.B
// run the whole chain against this client to build up the tendermint state // run the whole chain against this client to build up the tendermint state
clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "1"))) clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "1")))
proxyApp := proxy.NewAppConns(clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock)) proxyApp := proxy.NewAppConns(clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock))
if _, err := proxyApp.Start(); err != nil {
if err := proxyApp.Start(); err != nil {
panic(err) panic(err)
} }
defer proxyApp.Stop() defer proxyApp.Stop()


+ 3
- 3
consensus/state.go View File

@ -229,7 +229,7 @@ func (cs *ConsensusState) OnStart() error {
// NOTE: we will get a build up of garbage go routines // NOTE: we will get a build up of garbage go routines
// firing on the tockChan until the receiveRoutine is started // firing on the tockChan until the receiveRoutine is started
// to deal with them (by that point, at most one will be valid) // to deal with them (by that point, at most one will be valid)
_, err := cs.timeoutTicker.Start()
err := cs.timeoutTicker.Start()
if err != nil { if err != nil {
return err return err
} }
@ -257,7 +257,7 @@ func (cs *ConsensusState) OnStart() error {
// timeoutRoutine: receive requests for timeouts on tickChan and fire timeouts on tockChan // timeoutRoutine: receive requests for timeouts on tickChan and fire timeouts on tockChan
// receiveRoutine: serializes processing of proposoals, block parts, votes; coordinates state transitions // receiveRoutine: serializes processing of proposoals, block parts, votes; coordinates state transitions
func (cs *ConsensusState) startRoutines(maxSteps int) { func (cs *ConsensusState) startRoutines(maxSteps int) {
_, err := cs.timeoutTicker.Start()
err := cs.timeoutTicker.Start()
if err != nil { if err != nil {
cs.Logger.Error("Error starting timeout ticker", "err", err) cs.Logger.Error("Error starting timeout ticker", "err", err)
return return
@ -292,7 +292,7 @@ func (cs *ConsensusState) OpenWAL(walFile string) (WAL, error) {
return nil, err return nil, err
} }
wal.SetLogger(cs.Logger.With("wal", walFile)) wal.SetLogger(cs.Logger.With("wal", walFile))
if _, err := wal.Start(); err != nil {
if err := wal.Start(); err != nil {
return nil, err return nil, err
} }
return wal, nil return wal, nil


+ 2
- 2
consensus/ticker.go View File

@ -15,8 +15,8 @@ var (
// conditional on the height/round/step in the timeoutInfo. // conditional on the height/round/step in the timeoutInfo.
// The timeoutInfo.Duration may be non-positive. // The timeoutInfo.Duration may be non-positive.
type TimeoutTicker interface { type TimeoutTicker interface {
Start() (bool, error)
Stop() bool
Start() error
Stop() error
Chan() <-chan timeoutInfo // on which to receive a timeout Chan() <-chan timeoutInfo // on which to receive a timeout
ScheduleTimeout(ti timeoutInfo) // reset the timer ScheduleTimeout(ti timeoutInfo) // reset the timer


+ 6
- 6
consensus/wal.go View File

@ -54,8 +54,8 @@ type WAL interface {
Group() *auto.Group Group() *auto.Group
SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error)
Start() (bool, error)
Stop() bool
Start() error
Stop() error
Wait() Wait()
} }
@ -102,7 +102,7 @@ func (wal *baseWAL) OnStart() error {
} else if size == 0 { } else if size == 0 {
wal.Save(EndHeightMessage{0}) wal.Save(EndHeightMessage{0})
} }
_, err = wal.group.Start()
err = wal.group.Start()
return err return err
} }
@ -307,6 +307,6 @@ func (nilWAL) Group() *auto.Group { return nil }
func (nilWAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error) { func (nilWAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error) {
return nil, false, nil return nil, false, nil
} }
func (nilWAL) Start() (bool, error) { return true, nil }
func (nilWAL) Stop() bool { return true }
func (nilWAL) Wait() {}
func (nilWAL) Start() error { return nil }
func (nilWAL) Stop() error { return nil }
func (nilWAL) Wait() {}

+ 1
- 1
glide.lock View File

@ -123,7 +123,7 @@ imports:
subpackages: subpackages:
- iavl - iavl
- name: github.com/tendermint/tmlibs - name: github.com/tendermint/tmlibs
version: b854baa1fce7101c90b1d301b3359bb412f981c0
version: 1e12754b3a3b5f1c23bf44c2d882faae688fb2e8
subpackages: subpackages:
- autofile - autofile
- cli - cli


+ 1
- 1
glide.yaml View File

@ -34,7 +34,7 @@ import:
subpackages: subpackages:
- iavl - iavl
- package: github.com/tendermint/tmlibs - package: github.com/tendermint/tmlibs
version: ~0.4.1
version: 1e12754b3a3b5f1c23bf44c2d882faae688fb2e8
subpackages: subpackages:
- autofile - autofile
- cli - cli


+ 2
- 2
mempool/mempool_test.go View File

@ -27,7 +27,7 @@ func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
appConnMem, _ := cc.NewABCIClient() appConnMem, _ := cc.NewABCIClient()
appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
_, err := appConnMem.Start()
err := appConnMem.Start()
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -121,7 +121,7 @@ func TestSerialReap(t *testing.T) {
mempool := newMempoolWithApp(cc) mempool := newMempoolWithApp(cc)
appConnCon, _ := cc.NewABCIClient() appConnCon, _ := cc.NewABCIClient()
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
if _, err := appConnCon.Start(); err != nil {
if err := appConnCon.Start(); err != nil {
t.Fatalf("Error starting ABCI client: %v", err.Error()) t.Fatalf("Error starting ABCI client: %v", err.Error())
} }


+ 3
- 3
node/node.go View File

@ -165,7 +165,7 @@ func NewNode(config *cfg.Config,
handshaker.SetLogger(consensusLogger) handshaker.SetLogger(consensusLogger)
proxyApp := proxy.NewAppConns(clientCreator, handshaker) proxyApp := proxy.NewAppConns(clientCreator, handshaker)
proxyApp.SetLogger(logger.With("module", "proxy")) proxyApp.SetLogger(logger.With("module", "proxy"))
if _, err := proxyApp.Start(); err != nil {
if err := proxyApp.Start(); err != nil {
return nil, fmt.Errorf("Error starting proxy app connections: %v", err) return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
} }
@ -326,7 +326,7 @@ func NewNode(config *cfg.Config,
// OnStart starts the Node. It implements cmn.Service. // OnStart starts the Node. It implements cmn.Service.
func (n *Node) OnStart() error { func (n *Node) OnStart() error {
_, err := n.eventBus.Start()
err := n.eventBus.Start()
if err != nil { if err != nil {
return err return err
} }
@ -349,7 +349,7 @@ func (n *Node) OnStart() error {
// Start the switch // Start the switch
n.sw.SetNodeInfo(n.makeNodeInfo()) n.sw.SetNodeInfo(n.makeNodeInfo())
n.sw.SetNodePrivKey(n.privKey) n.sw.SetNodePrivKey(n.privKey)
_, err = n.sw.Start()
err = n.sw.Start()
if err != nil { if err != nil {
return err return err
} }


+ 1
- 1
node/node_test.go View File

@ -19,7 +19,7 @@ func TestNodeStartStop(t *testing.T) {
// create & start node // create & start node
n, err := DefaultNewNode(config, log.TestingLogger()) n, err := DefaultNewNode(config, log.TestingLogger())
assert.NoError(t, err, "expected no err on DefaultNewNode") assert.NoError(t, err, "expected no err on DefaultNewNode")
_, err1 := n.Start()
err1 := n.Start()
if err1 != nil { if err1 != nil {
t.Error(err1) t.Error(err1)
} }


+ 8
- 8
p2p/connection_test.go View File

@ -36,7 +36,7 @@ func TestMConnectionSend(t *testing.T) {
defer client.Close() // nolint: errcheck defer client.Close() // nolint: errcheck
mconn := createTestMConnection(client) mconn := createTestMConnection(client)
_, err := mconn.Start()
err := mconn.Start()
require.Nil(err) require.Nil(err)
defer mconn.Stop() defer mconn.Stop()
@ -77,12 +77,12 @@ func TestMConnectionReceive(t *testing.T) {
errorsCh <- r errorsCh <- r
} }
mconn1 := createMConnectionWithCallbacks(client, onReceive, onError) mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
_, err := mconn1.Start()
err := mconn1.Start()
require.Nil(err) require.Nil(err)
defer mconn1.Stop() defer mconn1.Stop()
mconn2 := createTestMConnection(server) mconn2 := createTestMConnection(server)
_, err = mconn2.Start()
err = mconn2.Start()
require.Nil(err) require.Nil(err)
defer mconn2.Stop() defer mconn2.Stop()
@ -107,7 +107,7 @@ func TestMConnectionStatus(t *testing.T) {
defer client.Close() // nolint: errcheck defer client.Close() // nolint: errcheck
mconn := createTestMConnection(client) mconn := createTestMConnection(client)
_, err := mconn.Start()
err := mconn.Start()
require.Nil(err) require.Nil(err)
defer mconn.Stop() defer mconn.Stop()
@ -132,7 +132,7 @@ func TestMConnectionStopsAndReturnsError(t *testing.T) {
errorsCh <- r errorsCh <- r
} }
mconn := createMConnectionWithCallbacks(client, onReceive, onError) mconn := createMConnectionWithCallbacks(client, onReceive, onError)
_, err := mconn.Start()
err := mconn.Start()
require.Nil(err) require.Nil(err)
defer mconn.Stop() defer mconn.Stop()
@ -164,7 +164,7 @@ func newClientAndServerConnsForReadErrors(require *require.Assertions, chOnErr c
} }
mconnClient := NewMConnection(client, chDescs, onReceive, onError) mconnClient := NewMConnection(client, chDescs, onReceive, onError)
mconnClient.SetLogger(log.TestingLogger().With("module", "client")) mconnClient.SetLogger(log.TestingLogger().With("module", "client"))
_, err := mconnClient.Start()
err := mconnClient.Start()
require.Nil(err) require.Nil(err)
// create server conn with 1 channel // create server conn with 1 channel
@ -175,7 +175,7 @@ func newClientAndServerConnsForReadErrors(require *require.Assertions, chOnErr c
} }
mconnServer := createMConnectionWithCallbacks(server, onReceive, onError) mconnServer := createMConnectionWithCallbacks(server, onReceive, onError)
mconnServer.SetLogger(serverLogger) mconnServer.SetLogger(serverLogger)
_, err = mconnServer.Start()
err = mconnServer.Start()
require.Nil(err) require.Nil(err)
return mconnClient, mconnServer return mconnClient, mconnServer
} }
@ -288,7 +288,7 @@ func TestMConnectionTrySend(t *testing.T) {
defer client.Close() defer client.Close()
mconn := createTestMConnection(client) mconn := createTestMConnection(client)
_, err := mconn.Start()
err := mconn.Start()
require.Nil(err) require.Nil(err)
defer mconn.Stop() defer mconn.Stop()


+ 2
- 2
p2p/listener.go View File

@ -16,7 +16,7 @@ type Listener interface {
InternalAddress() *NetAddress InternalAddress() *NetAddress
ExternalAddress() *NetAddress ExternalAddress() *NetAddress
String() string String() string
Stop() bool
Stop() error
} }
// Implements Listener // Implements Listener
@ -100,7 +100,7 @@ func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger log
connections: make(chan net.Conn, numBufferedConnections), connections: make(chan net.Conn, numBufferedConnections),
} }
dl.BaseService = *cmn.NewBaseService(logger, "DefaultListener", dl) dl.BaseService = *cmn.NewBaseService(logger, "DefaultListener", dl)
_, err = dl.Start() // Started upon construction
err = dl.Start() // Started upon construction
if err != nil { if err != nil {
logger.Error("Error starting base service", "err", err) logger.Error("Error starting base service", "err", err)
} }


+ 1
- 1
p2p/peer.go View File

@ -235,7 +235,7 @@ func (p *peer) OnStart() error {
if err := p.BaseService.OnStart(); err != nil { if err := p.BaseService.OnStart(); err != nil {
return err return err
} }
_, err := p.mconn.Start()
err := p.mconn.Start()
return err return err
} }


+ 3
- 3
p2p/peer_test.go View File

@ -23,7 +23,7 @@ func TestPeerBasic(t *testing.T) {
p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), DefaultPeerConfig()) p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), DefaultPeerConfig())
require.Nil(err) require.Nil(err)
_, err = p.Start()
err = p.Start()
require.Nil(err) require.Nil(err)
defer p.Stop() defer p.Stop()
@ -50,7 +50,7 @@ func TestPeerWithoutAuthEnc(t *testing.T) {
p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config) p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config)
require.Nil(err) require.Nil(err)
_, err = p.Start()
err = p.Start()
require.Nil(err) require.Nil(err)
defer p.Stop() defer p.Stop()
@ -71,7 +71,7 @@ func TestPeerSend(t *testing.T) {
p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config) p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config)
require.Nil(err) require.Nil(err)
_, err = p.Start()
err = p.Start()
require.Nil(err) require.Nil(err)
defer p.Stop() defer p.Stop()


+ 1
- 1
p2p/pex_reactor.go View File

@ -69,7 +69,7 @@ func (r *PEXReactor) OnStart() error {
if err := r.BaseReactor.OnStart(); err != nil { if err := r.BaseReactor.OnStart(); err != nil {
return err return err
} }
_, err := r.book.Start()
err := r.book.Start()
if err != nil { if err != nil {
return err return err
} }


+ 1
- 1
p2p/pex_reactor_test.go View File

@ -95,7 +95,7 @@ func TestPEXReactorRunning(t *testing.T) {
// start switches // start switches
for _, s := range switches { for _, s := range switches {
_, err := s.Start() // start switch and reactors
err := s.Start() // start switch and reactors
require.Nil(err) require.Nil(err)
} }


+ 3
- 3
p2p/switch.go View File

@ -179,7 +179,7 @@ func (sw *Switch) OnStart() error {
} }
// Start reactors // Start reactors
for _, reactor := range sw.reactors { for _, reactor := range sw.reactors {
_, err := reactor.Start()
err := reactor.Start()
if err != nil { if err != nil {
return err return err
} }
@ -289,7 +289,7 @@ func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
} }
func (sw *Switch) startInitPeer(peer *peer) { func (sw *Switch) startInitPeer(peer *peer) {
_, err := peer.Start() // spawn send/recv routines
err := peer.Start() // spawn send/recv routines
if err != nil { if err != nil {
// Should never happen // Should never happen
sw.Logger.Error("Error starting peer", "peer", peer, "err", err) sw.Logger.Error("Error starting peer", "peer", peer, "err", err)
@ -547,7 +547,7 @@ func Connect2Switches(switches []*Switch, i, j int) {
// It returns the first encountered error. // It returns the first encountered error.
func StartSwitches(switches []*Switch) error { func StartSwitches(switches []*Switch) error {
for _, s := range switches { for _, s := range switches {
_, err := s.Start() // start switch and reactors
err := s.Start() // start switch and reactors
if err != nil { if err != nil {
return err return err
} }


+ 2
- 2
p2p/switch_test.go View File

@ -225,7 +225,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
assert, require := assert.New(t), require.New(t) assert, require := assert.New(t), require.New(t)
sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
_, err := sw.Start()
err := sw.Start()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -252,7 +252,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
assert, require := assert.New(t), require.New(t) assert, require := assert.New(t), require.New(t)
sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
_, err := sw.Start()
err := sw.Start()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }


+ 6
- 6
proxy/app_conn_test.go View File

@ -51,7 +51,7 @@ func TestEcho(t *testing.T) {
// Start server // Start server
s := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) s := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
s.SetLogger(log.TestingLogger().With("module", "abci-server")) s.SetLogger(log.TestingLogger().With("module", "abci-server"))
if _, err := s.Start(); err != nil {
if err := s.Start(); err != nil {
t.Fatalf("Error starting socket server: %v", err.Error()) t.Fatalf("Error starting socket server: %v", err.Error())
} }
defer s.Stop() defer s.Stop()
@ -62,7 +62,7 @@ func TestEcho(t *testing.T) {
t.Fatalf("Error creating ABCI client: %v", err.Error()) t.Fatalf("Error creating ABCI client: %v", err.Error())
} }
cli.SetLogger(log.TestingLogger().With("module", "abci-client")) cli.SetLogger(log.TestingLogger().With("module", "abci-client"))
if _, err := cli.Start(); err != nil {
if err := cli.Start(); err != nil {
t.Fatalf("Error starting ABCI client: %v", err.Error()) t.Fatalf("Error starting ABCI client: %v", err.Error())
} }
@ -85,7 +85,7 @@ func BenchmarkEcho(b *testing.B) {
// Start server // Start server
s := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) s := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
s.SetLogger(log.TestingLogger().With("module", "abci-server")) s.SetLogger(log.TestingLogger().With("module", "abci-server"))
if _, err := s.Start(); err != nil {
if err := s.Start(); err != nil {
b.Fatalf("Error starting socket server: %v", err.Error()) b.Fatalf("Error starting socket server: %v", err.Error())
} }
defer s.Stop() defer s.Stop()
@ -96,7 +96,7 @@ func BenchmarkEcho(b *testing.B) {
b.Fatalf("Error creating ABCI client: %v", err.Error()) b.Fatalf("Error creating ABCI client: %v", err.Error())
} }
cli.SetLogger(log.TestingLogger().With("module", "abci-client")) cli.SetLogger(log.TestingLogger().With("module", "abci-client"))
if _, err := cli.Start(); err != nil {
if err := cli.Start(); err != nil {
b.Fatalf("Error starting ABCI client: %v", err.Error()) b.Fatalf("Error starting ABCI client: %v", err.Error())
} }
@ -124,7 +124,7 @@ func TestInfo(t *testing.T) {
// Start server // Start server
s := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) s := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
s.SetLogger(log.TestingLogger().With("module", "abci-server")) s.SetLogger(log.TestingLogger().With("module", "abci-server"))
if _, err := s.Start(); err != nil {
if err := s.Start(); err != nil {
t.Fatalf("Error starting socket server: %v", err.Error()) t.Fatalf("Error starting socket server: %v", err.Error())
} }
defer s.Stop() defer s.Stop()
@ -135,7 +135,7 @@ func TestInfo(t *testing.T) {
t.Fatalf("Error creating ABCI client: %v", err.Error()) t.Fatalf("Error creating ABCI client: %v", err.Error())
} }
cli.SetLogger(log.TestingLogger().With("module", "abci-client")) cli.SetLogger(log.TestingLogger().With("module", "abci-client"))
if _, err := cli.Start(); err != nil {
if err := cli.Start(); err != nil {
t.Fatalf("Error starting ABCI client: %v", err.Error()) t.Fatalf("Error starting ABCI client: %v", err.Error())
} }


+ 3
- 3
proxy/multi_app_conn.go View File

@ -76,7 +76,7 @@ func (app *multiAppConn) OnStart() error {
return errors.Wrap(err, "Error creating ABCI client (query connection)") return errors.Wrap(err, "Error creating ABCI client (query connection)")
} }
querycli.SetLogger(app.Logger.With("module", "abci-client", "connection", "query")) querycli.SetLogger(app.Logger.With("module", "abci-client", "connection", "query"))
if _, err := querycli.Start(); err != nil {
if err := querycli.Start(); err != nil {
return errors.Wrap(err, "Error starting ABCI client (query connection)") return errors.Wrap(err, "Error starting ABCI client (query connection)")
} }
app.queryConn = NewAppConnQuery(querycli) app.queryConn = NewAppConnQuery(querycli)
@ -87,7 +87,7 @@ func (app *multiAppConn) OnStart() error {
return errors.Wrap(err, "Error creating ABCI client (mempool connection)") return errors.Wrap(err, "Error creating ABCI client (mempool connection)")
} }
memcli.SetLogger(app.Logger.With("module", "abci-client", "connection", "mempool")) memcli.SetLogger(app.Logger.With("module", "abci-client", "connection", "mempool"))
if _, err := memcli.Start(); err != nil {
if err := memcli.Start(); err != nil {
return errors.Wrap(err, "Error starting ABCI client (mempool connection)") return errors.Wrap(err, "Error starting ABCI client (mempool connection)")
} }
app.mempoolConn = NewAppConnMempool(memcli) app.mempoolConn = NewAppConnMempool(memcli)
@ -98,7 +98,7 @@ func (app *multiAppConn) OnStart() error {
return errors.Wrap(err, "Error creating ABCI client (consensus connection)") return errors.Wrap(err, "Error creating ABCI client (consensus connection)")
} }
concli.SetLogger(app.Logger.With("module", "abci-client", "connection", "consensus")) concli.SetLogger(app.Logger.With("module", "abci-client", "connection", "consensus"))
if _, err := concli.Start(); err != nil {
if err := concli.Start(); err != nil {
return errors.Wrap(err, "Error starting ABCI client (consensus connection)") return errors.Wrap(err, "Error starting ABCI client (consensus connection)")
} }
app.consensusConn = NewAppConnConsensus(concli) app.consensusConn = NewAppConnConsensus(concli)


+ 4
- 8
rpc/client/event_test.go View File

@ -27,9 +27,8 @@ func TestHeaderEvents(t *testing.T) {
// start for this test it if it wasn't already running // start for this test it if it wasn't already running
if !c.IsRunning() { if !c.IsRunning() {
// if so, then we start it, listen, and stop it. // if so, then we start it, listen, and stop it.
st, err := c.Start()
err := c.Start()
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
require.True(st, "%d", i)
defer c.Stop() defer c.Stop()
} }
@ -48,9 +47,8 @@ func TestBlockEvents(t *testing.T) {
// start for this test it if it wasn't already running // start for this test it if it wasn't already running
if !c.IsRunning() { if !c.IsRunning() {
// if so, then we start it, listen, and stop it. // if so, then we start it, listen, and stop it.
st, err := c.Start()
err := c.Start()
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
require.True(st, "%d", i)
defer c.Stop() defer c.Stop()
} }
@ -80,9 +78,8 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) {
// start for this test it if it wasn't already running // start for this test it if it wasn't already running
if !c.IsRunning() { if !c.IsRunning() {
// if so, then we start it, listen, and stop it. // if so, then we start it, listen, and stop it.
st, err := c.Start()
err := c.Start()
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
require.True(st, "%d", i)
defer c.Stop() defer c.Stop()
} }
@ -113,9 +110,8 @@ func TestTxEventsSentWithBroadcastTxSync(t *testing.T) {
// start for this test it if it wasn't already running // start for this test it if it wasn't already running
if !c.IsRunning() { if !c.IsRunning() {
// if so, then we start it, listen, and stop it. // if so, then we start it, listen, and stop it.
st, err := c.Start()
err := c.Start()
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
require.True(st, "%d", i)
defer c.Stop() defer c.Stop()
} }


+ 5
- 5
rpc/client/httpclient.go View File

@ -215,26 +215,26 @@ func newWSEvents(remote, endpoint string) *WSEvents {
// Start is the only way I could think the extend OnStart from // Start is the only way I could think the extend OnStart from
// events.eventSwitch. If only it wasn't private... // events.eventSwitch. If only it wasn't private...
// BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start // BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
func (w *WSEvents) Start() (bool, error) {
func (w *WSEvents) Start() error {
ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() { ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
w.redoSubscriptions() w.redoSubscriptions()
})) }))
started, err := ws.Start()
err := ws.Start()
if err == nil { if err == nil {
w.ws = ws w.ws = ws
go w.eventListener() go w.eventListener()
} }
return started, errors.Wrap(err, "StartWSEvent")
return err
} }
// Stop wraps the BaseService/eventSwitch actions as Start does // Stop wraps the BaseService/eventSwitch actions as Start does
func (w *WSEvents) Stop() bool {
func (w *WSEvents) Stop() error {
// send a message to quit to stop the eventListener // send a message to quit to stop the eventListener
w.quit <- true w.quit <- true
<-w.done <-w.done
w.ws.Stop() w.ws.Stop()
w.ws = nil w.ws = nil
return true
return nil
} }
func (w *WSEvents) Subscribe(ctx context.Context, query string, out chan<- interface{}) error { func (w *WSEvents) Subscribe(ctx context.Context, query string, out chan<- interface{}) error {


+ 7
- 7
rpc/lib/client/ws_client.go View File

@ -47,10 +47,10 @@ type WSClient struct {
onReconnect func() onReconnect func()
// internal channels // internal channels
send chan types.RPCRequest // user requests
backlog chan types.RPCRequest // stores a single user request received during a conn failure
reconnectAfter chan error // reconnect requests
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
send chan types.RPCRequest // user requests
backlog chan types.RPCRequest // stores a single user request received during a conn failure
reconnectAfter chan error // reconnect requests
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
wg sync.WaitGroup wg sync.WaitGroup
@ -168,12 +168,12 @@ func (c *WSClient) OnStop() {}
// Stop overrides cmn.Service#Stop. There is no other way to wait until Quit // Stop overrides cmn.Service#Stop. There is no other way to wait until Quit
// channel is closed. // channel is closed.
func (c *WSClient) Stop() bool {
success := c.BaseService.Stop()
func (c *WSClient) Stop() error {
err := c.BaseService.Stop()
// only close user-facing channels when we can't write to them // only close user-facing channels when we can't write to them
c.wg.Wait() c.wg.Wait()
close(c.ResponsesCh) close(c.ResponsesCh)
return success
return err
} }
// IsReconnecting returns true if the client is reconnecting right now. // IsReconnecting returns true if the client is reconnecting right now.


+ 1
- 1
rpc/lib/client/ws_client_test.go View File

@ -196,7 +196,7 @@ func TestNotBlockingOnStop(t *testing.T) {
func startClient(t *testing.T, addr net.Addr) *WSClient { func startClient(t *testing.T, addr net.Addr) *WSClient {
c := NewWSClient(addr.String(), "/websocket") c := NewWSClient(addr.String(), "/websocket")
_, err := c.Start()
err := c.Start()
require.Nil(t, err) require.Nil(t, err)
c.SetLogger(log.TestingLogger()) c.SetLogger(log.TestingLogger())
return c return c


+ 4
- 4
rpc/lib/rpc_test.go View File

@ -278,7 +278,7 @@ func TestServersAndClientsBasic(t *testing.T) {
cl3 := client.NewWSClient(addr, websocketEndpoint) cl3 := client.NewWSClient(addr, websocketEndpoint)
cl3.SetLogger(log.TestingLogger()) cl3.SetLogger(log.TestingLogger())
_, err := cl3.Start()
err := cl3.Start()
require.Nil(t, err) require.Nil(t, err)
fmt.Printf("=== testing server on %s using %v client", addr, cl3) fmt.Printf("=== testing server on %s using %v client", addr, cl3)
testWithWSClient(t, cl3) testWithWSClient(t, cl3)
@ -307,7 +307,7 @@ func TestQuotedStringArg(t *testing.T) {
func TestWSNewWSRPCFunc(t *testing.T) { func TestWSNewWSRPCFunc(t *testing.T) {
cl := client.NewWSClient(tcpAddr, websocketEndpoint) cl := client.NewWSClient(tcpAddr, websocketEndpoint)
cl.SetLogger(log.TestingLogger()) cl.SetLogger(log.TestingLogger())
_, err := cl.Start()
err := cl.Start()
require.Nil(t, err) require.Nil(t, err)
defer cl.Stop() defer cl.Stop()
@ -332,7 +332,7 @@ func TestWSNewWSRPCFunc(t *testing.T) {
func TestWSHandlesArrayParams(t *testing.T) { func TestWSHandlesArrayParams(t *testing.T) {
cl := client.NewWSClient(tcpAddr, websocketEndpoint) cl := client.NewWSClient(tcpAddr, websocketEndpoint)
cl.SetLogger(log.TestingLogger()) cl.SetLogger(log.TestingLogger())
_, err := cl.Start()
err := cl.Start()
require.Nil(t, err) require.Nil(t, err)
defer cl.Stop() defer cl.Stop()
@ -357,7 +357,7 @@ func TestWSHandlesArrayParams(t *testing.T) {
func TestWSClientPingPong(t *testing.T) { func TestWSClientPingPong(t *testing.T) {
cl := client.NewWSClient(tcpAddr, websocketEndpoint) cl := client.NewWSClient(tcpAddr, websocketEndpoint)
cl.SetLogger(log.TestingLogger()) cl.SetLogger(log.TestingLogger())
_, err := cl.Start()
err := cl.Start()
require.Nil(t, err) require.Nil(t, err)
defer cl.Stop() defer cl.Stop()


+ 1
- 1
rpc/lib/server/handlers.go View File

@ -723,7 +723,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ
con := NewWSConnection(wsConn, wm.funcMap, wm.wsConnOptions...) con := NewWSConnection(wsConn, wm.funcMap, wm.wsConnOptions...)
con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr())) con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr()))
wm.logger.Info("New websocket connection", "remote", con.remoteAddr) wm.logger.Info("New websocket connection", "remote", con.remoteAddr)
_, err = con.Start() // Blocking
err = con.Start() // Blocking
if err != nil { if err != nil {
wm.logger.Error("Error starting connection", "err", err) wm.logger.Error("Error starting connection", "err", err)
} }


+ 1
- 1
rpc/test/helpers.go View File

@ -92,7 +92,7 @@ func GetGRPCClient() core_grpc.BroadcastAPIClient {
// StartTendermint starts a test tendermint server in a go routine and returns when it is initialized // StartTendermint starts a test tendermint server in a go routine and returns when it is initialized
func StartTendermint(app abci.Application) *nm.Node { func StartTendermint(app abci.Application) *nm.Node {
node := NewTendermint(app) node := NewTendermint(app)
_, err := node.Start()
err := node.Start()
if err != nil { if err != nil {
panic(err) panic(err)
} }


+ 1
- 1
state/execution_test.go View File

@ -25,7 +25,7 @@ var (
func TestApplyBlock(t *testing.T) { func TestApplyBlock(t *testing.T) {
cc := proxy.NewLocalClientCreator(dummy.NewDummyApplication()) cc := proxy.NewLocalClientCreator(dummy.NewDummyApplication())
proxyApp := proxy.NewAppConns(cc, nil) proxyApp := proxy.NewAppConns(cc, nil)
_, err := proxyApp.Start()
err := proxyApp.Start()
require.Nil(t, err) require.Nil(t, err)
defer proxyApp.Stop() defer proxyApp.Stop()


Loading…
Cancel
Save