Browse Source

logging: remove reamining instances of SetLogger interface (#7572)

pull/7577/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
2a348cc1e9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 103 additions and 89 deletions
  1. +1
    -2
      abci/cmd/abci-cli/abci-cli.go
  2. +9
    -3
      abci/example/kvstore/kvstore_test.go
  3. +2
    -6
      abci/example/kvstore/persistent_kvstore.go
  4. +4
    -1
      cmd/tendermint/commands/rollback_test.go
  5. +1
    -1
      internal/consensus/byzantine_test.go
  6. +9
    -10
      internal/consensus/common_test.go
  7. +1
    -1
      internal/consensus/reactor_test.go
  8. +3
    -3
      internal/consensus/replay_test.go
  9. +4
    -5
      internal/consensus/wal_generator.go
  10. +7
    -4
      internal/consensus/wal_test.go
  11. +1
    -2
      internal/inspect/rpc/rpc.go
  12. +1
    -1
      internal/proxy/client.go
  13. +2
    -2
      light/proxy/proxy.go
  14. +1
    -2
      node/node.go
  15. +2
    -7
      rpc/client/local/local.go
  16. +3
    -2
      rpc/client/main_test.go
  17. +41
    -18
      rpc/client/rpc_test.go
  18. +6
    -6
      rpc/jsonrpc/client/integration_test.go
  19. +2
    -4
      rpc/jsonrpc/jsonrpc_test.go
  20. +2
    -6
      rpc/jsonrpc/server/ws_handler.go
  21. +1
    -3
      rpc/jsonrpc/server/ws_handler_test.go

+ 1
- 2
abci/cmd/abci-cli/abci-cli.go View File

@ -582,8 +582,7 @@ func cmdKVStore(cmd *cobra.Command, args []string) error {
if flagPersist == "" { if flagPersist == "" {
app = kvstore.NewApplication() app = kvstore.NewApplication()
} else { } else {
app = kvstore.NewPersistentKVStoreApplication(flagPersist)
app.(*kvstore.PersistentKVStoreApplication).SetLogger(logger.With("module", "kvstore"))
app = kvstore.NewPersistentKVStoreApplication(logger, flagPersist)
} }
// Start the listener // Start the listener


+ 9
- 3
abci/example/kvstore/kvstore_test.go View File

@ -76,7 +76,9 @@ func TestPersistentKVStoreKV(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
kvstore := NewPersistentKVStoreApplication(dir)
logger := log.NewTestingLogger(t)
kvstore := NewPersistentKVStoreApplication(logger, dir)
key := testKey key := testKey
value := key value := key
tx := []byte(key) tx := []byte(key)
@ -92,7 +94,9 @@ func TestPersistentKVStoreInfo(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
kvstore := NewPersistentKVStoreApplication(dir)
logger := log.NewTestingLogger(t)
kvstore := NewPersistentKVStoreApplication(logger, dir)
InitKVStore(kvstore) InitKVStore(kvstore)
height := int64(0) height := int64(0)
@ -124,7 +128,9 @@ func TestValUpdates(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
kvstore := NewPersistentKVStoreApplication(dir)
logger := log.NewTestingLogger(t)
kvstore := NewPersistentKVStoreApplication(logger, dir)
// init with some validators // init with some validators
total := 10 total := 10


+ 2
- 6
abci/example/kvstore/persistent_kvstore.go View File

@ -35,7 +35,7 @@ type PersistentKVStoreApplication struct {
logger log.Logger logger log.Logger
} }
func NewPersistentKVStoreApplication(dbDir string) *PersistentKVStoreApplication {
func NewPersistentKVStoreApplication(logger log.Logger, dbDir string) *PersistentKVStoreApplication {
name := "kvstore" name := "kvstore"
db, err := dbm.NewGoLevelDB(name, dbDir) db, err := dbm.NewGoLevelDB(name, dbDir)
if err != nil { if err != nil {
@ -47,7 +47,7 @@ func NewPersistentKVStoreApplication(dbDir string) *PersistentKVStoreApplication
return &PersistentKVStoreApplication{ return &PersistentKVStoreApplication{
app: &Application{state: state}, app: &Application{state: state},
valAddrToPubKeyMap: make(map[string]cryptoproto.PublicKey), valAddrToPubKeyMap: make(map[string]cryptoproto.PublicKey),
logger: log.NewNopLogger(),
logger: logger,
} }
} }
@ -55,10 +55,6 @@ func (app *PersistentKVStoreApplication) Close() error {
return app.app.state.db.Close() return app.app.state.db.Close()
} }
func (app *PersistentKVStoreApplication) SetLogger(l log.Logger) {
app.logger = l
}
func (app *PersistentKVStoreApplication) Info(req types.RequestInfo) types.ResponseInfo { func (app *PersistentKVStoreApplication) Info(req types.RequestInfo) types.ResponseInfo {
res := app.app.Info(req) res := app.app.Info(req)
res.LastBlockHeight = app.app.state.Height res.LastBlockHeight = app.app.state.Height


+ 4
- 1
cmd/tendermint/commands/rollback_test.go View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/cmd/tendermint/commands" "github.com/tendermint/tendermint/cmd/tendermint/commands"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/rpc/client/local" "github.com/tendermint/tendermint/rpc/client/local"
rpctest "github.com/tendermint/tendermint/rpc/test" rpctest "github.com/tendermint/tendermint/rpc/test"
e2e "github.com/tendermint/tendermint/test/e2e/app" e2e "github.com/tendermint/tendermint/test/e2e/app"
@ -49,7 +50,9 @@ func TestRollbackIntegration(t *testing.T) {
node2, _, err2 := rpctest.StartTendermint(ctx, cfg, app, rpctest.SuppressStdout) node2, _, err2 := rpctest.StartTendermint(ctx, cfg, app, rpctest.SuppressStdout)
require.NoError(t, err2) require.NoError(t, err2)
client, err := local.New(node2.(local.NodeService))
logger := log.NewTestingLogger(t)
client, err := local.New(logger, node2.(local.NodeService))
require.NoError(t, err) require.NoError(t, err)
ticker := time.NewTicker(200 * time.Millisecond) ticker := time.NewTicker(200 * time.Millisecond)


+ 1
- 1
internal/consensus/byzantine_test.go View File

@ -59,7 +59,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
defer os.RemoveAll(thisConfig.RootDir) defer os.RemoveAll(thisConfig.RootDir)
ensureDir(t, path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal ensureDir(t, path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
app := appFunc(t)
app := appFunc(t, logger)
vals := types.TM2PB.ValidatorUpdates(state.Validators) vals := types.TM2PB.ValidatorUpdates(state.Validators)
app.InitChain(abci.RequestInitChain{Validators: vals}) app.InitChain(abci.RequestInitChain{Validators: vals})


+ 9
- 10
internal/consensus/common_test.go View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"path"
"path/filepath" "path/filepath"
"sort" "sort"
"sync" "sync"
@ -737,7 +736,7 @@ func randConsensusState(
nValidators int, nValidators int,
testName string, testName string,
tickerFunc func() TimeoutTicker, tickerFunc func() TimeoutTicker,
appFunc func(t *testing.T) abci.Application,
appFunc func(t *testing.T, logger log.Logger) abci.Application,
configOpts ...func(*config.Config), configOpts ...func(*config.Config),
) ([]*State, cleanupFunc) { ) ([]*State, cleanupFunc) {
@ -764,7 +763,7 @@ func randConsensusState(
ensureDir(t, filepath.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal ensureDir(t, filepath.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
app := appFunc(t)
app := appFunc(t, logger)
if appCloser, ok := app.(io.Closer); ok { if appCloser, ok := app.(io.Closer); ok {
closeFuncs = append(closeFuncs, appCloser.Close) closeFuncs = append(closeFuncs, appCloser.Close)
@ -797,7 +796,7 @@ func randConsensusNetWithPeers(
nPeers int, nPeers int,
testName string, testName string,
tickerFunc func() TimeoutTicker, tickerFunc func() TimeoutTicker,
appFunc func(string) abci.Application,
appFunc func(log.Logger, string) abci.Application,
) ([]*State, *types.GenesisDoc, *config.Config, cleanupFunc) { ) ([]*State, *types.GenesisDoc, *config.Config, cleanupFunc) {
t.Helper() t.Helper()
@ -831,7 +830,7 @@ func randConsensusNetWithPeers(
require.NoError(t, err) require.NoError(t, err)
} }
app := appFunc(path.Join(cfg.DBDir(), fmt.Sprintf("%s_%d", testName, i)))
app := appFunc(logger, filepath.Join(cfg.DBDir(), fmt.Sprintf("%s_%d", testName, i)))
vals := types.TM2PB.ValidatorUpdates(state.Validators) vals := types.TM2PB.ValidatorUpdates(state.Validators)
if _, ok := app.(*kvstore.PersistentKVStoreApplication); ok { if _, ok := app.(*kvstore.PersistentKVStoreApplication); ok {
// simulate handshake, receive app version. If don't do this, replay test will fail // simulate handshake, receive app version. If don't do this, replay test will fail
@ -912,21 +911,21 @@ func (m *mockTicker) Chan() <-chan timeoutInfo {
func (*mockTicker) SetLogger(log.Logger) {} func (*mockTicker) SetLogger(log.Logger) {}
func newPersistentKVStore(t *testing.T) abci.Application {
func newPersistentKVStore(t *testing.T, logger log.Logger) abci.Application {
t.Helper() t.Helper()
dir, err := os.MkdirTemp("", "persistent-kvstore") dir, err := os.MkdirTemp("", "persistent-kvstore")
require.NoError(t, err) require.NoError(t, err)
return kvstore.NewPersistentKVStoreApplication(dir)
return kvstore.NewPersistentKVStoreApplication(logger, dir)
} }
func newKVStore(_ *testing.T) abci.Application {
func newKVStore(_ *testing.T, _ log.Logger) abci.Application {
return kvstore.NewApplication() return kvstore.NewApplication()
} }
func newPersistentKVStoreWithPath(dbDir string) abci.Application {
return kvstore.NewPersistentKVStoreApplication(dbDir)
func newPersistentKVStoreWithPath(logger log.Logger, dbDir string) abci.Application {
return kvstore.NewPersistentKVStoreApplication(logger, dbDir)
} }
func signDataIsEqual(v1 *types.Vote, v2 *tmproto.Vote) bool { func signDataIsEqual(v1 *types.Vote, v2 *tmproto.Vote) bool {


+ 1
- 1
internal/consensus/reactor_test.go View File

@ -398,7 +398,7 @@ func TestReactorWithEvidence(t *testing.T) {
defer os.RemoveAll(thisConfig.RootDir) defer os.RemoveAll(thisConfig.RootDir)
ensureDir(t, path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal ensureDir(t, path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
app := appFunc(t)
app := appFunc(t, logger)
vals := types.TM2PB.ValidatorUpdates(state.Validators) vals := types.TM2PB.ValidatorUpdates(state.Validators)
app.InitChain(abci.RequestInitChain{Validators: vals}) app.InitChain(abci.RequestInitChain{Validators: vals})


+ 3
- 3
internal/consensus/replay_test.go View File

@ -763,7 +763,7 @@ func testHandshakeReplay(
testConfig, err := ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode)) testConfig, err := ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode))
require.NoError(t, err) require.NoError(t, err)
defer func() { _ = os.RemoveAll(testConfig.RootDir) }() defer func() { _ = os.RemoveAll(testConfig.RootDir) }()
walBody, err := WALWithNBlocks(ctx, t, numBlocks)
walBody, err := WALWithNBlocks(ctx, t, logger, numBlocks)
require.NoError(t, err) require.NoError(t, err)
walFile := tempWALWithData(t, walBody) walFile := tempWALWithData(t, walBody)
cfg.Consensus.SetWalFile(walFile) cfg.Consensus.SetWalFile(walFile)
@ -805,7 +805,7 @@ func testHandshakeReplay(
latestAppHash := state.AppHash latestAppHash := state.AppHash
// make a new client creator // make a new client creator
kvstoreApp := kvstore.NewPersistentKVStoreApplication(
kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger,
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int()))) filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int())))
t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) }) t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) })
@ -959,7 +959,7 @@ func buildTMStateFromChain(
t.Helper() t.Helper()
// run the whole chain against this client to build up the tendermint state // run the whole chain against this client to build up the tendermint state
kvstoreApp := kvstore.NewPersistentKVStoreApplication(
kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger,
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode))) filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode)))
defer kvstoreApp.Close() defer kvstoreApp.Close()
clientCreator := abciclient.NewLocalCreator(kvstoreApp) clientCreator := abciclient.NewLocalCreator(kvstoreApp)


+ 4
- 5
internal/consensus/wal_generator.go View File

@ -31,13 +31,12 @@ import (
// persistent kvstore application and special consensus wal instance // persistent kvstore application and special consensus wal instance
// (byteBufferWAL) and waits until numBlocks are created. // (byteBufferWAL) and waits until numBlocks are created.
// If the node fails to produce given numBlocks, it returns an error. // If the node fails to produce given numBlocks, it returns an error.
func WALGenerateNBlocks(ctx context.Context, t *testing.T, wr io.Writer, numBlocks int) (err error) {
func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr io.Writer, numBlocks int) (err error) {
cfg := getConfig(t) cfg := getConfig(t)
app := kvstore.NewPersistentKVStoreApplication(filepath.Join(cfg.DBDir(), "wal_generator"))
app := kvstore.NewPersistentKVStoreApplication(logger, filepath.Join(cfg.DBDir(), "wal_generator"))
t.Cleanup(func() { require.NoError(t, app.Close()) }) t.Cleanup(func() { require.NoError(t, app.Close()) })
logger := log.TestingLogger().With("wal_generator", "wal_generator")
logger.Info("generating WAL (last height msg excluded)", "numBlocks", numBlocks) logger.Info("generating WAL (last height msg excluded)", "numBlocks", numBlocks)
// COPY PASTE FROM node.go WITH A FEW MODIFICATIONS // COPY PASTE FROM node.go WITH A FEW MODIFICATIONS
@ -116,11 +115,11 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, wr io.Writer, numBloc
} }
// WALWithNBlocks returns a WAL content with numBlocks. // WALWithNBlocks returns a WAL content with numBlocks.
func WALWithNBlocks(ctx context.Context, t *testing.T, numBlocks int) (data []byte, err error) {
func WALWithNBlocks(ctx context.Context, t *testing.T, logger log.Logger, numBlocks int) (data []byte, err error) {
var b bytes.Buffer var b bytes.Buffer
wr := bufio.NewWriter(&b) wr := bufio.NewWriter(&b)
if err := WALGenerateNBlocks(ctx, t, wr, numBlocks); err != nil {
if err := WALGenerateNBlocks(ctx, t, logger, wr, numBlocks); err != nil {
return []byte{}, err return []byte{}, err
} }


+ 7
- 4
internal/consensus/wal_test.go View File

@ -45,7 +45,7 @@ func TestWALTruncate(t *testing.T) {
// 60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), // 60 block's size nearly 70K, greater than group's headBuf size(4096 * 10),
// when headBuf is full, truncate content will Flush to the file. at this // when headBuf is full, truncate content will Flush to the file. at this
// time, RotateFile is called, truncate content exist in each file. // time, RotateFile is called, truncate content exist in each file.
err = WALGenerateNBlocks(ctx, t, wal.Group(), 60)
err = WALGenerateNBlocks(ctx, t, logger, wal.Group(), 60)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(1 * time.Millisecond) // wait groupCheckDuration, make sure RotateFile run time.Sleep(1 * time.Millisecond) // wait groupCheckDuration, make sure RotateFile run
@ -136,13 +136,15 @@ func TestWALSearchForEndHeight(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
walBody, err := WALWithNBlocks(ctx, t, 6)
logger := log.NewTestingLogger(t)
walBody, err := WALWithNBlocks(ctx, t, logger, 6)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
walFile := tempWALWithData(t, walBody) walFile := tempWALWithData(t, walBody)
wal, err := NewWAL(log.TestingLogger(), walFile)
wal, err := NewWAL(logger, walFile)
require.NoError(t, err) require.NoError(t, err)
h := int64(3) h := int64(3)
@ -171,9 +173,10 @@ func TestWALPeriodicSync(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
wal.SetFlushInterval(walTestFlushInterval) wal.SetFlushInterval(walTestFlushInterval)
logger := log.NewTestingLogger(t)
// Generate some data // Generate some data
err = WALGenerateNBlocks(ctx, t, wal.Group(), 5)
err = WALGenerateNBlocks(ctx, t, logger, wal.Group(), 5)
require.NoError(t, err) require.NoError(t, err)
// We should have data in the buffer now // We should have data in the buffer now


+ 1
- 2
internal/inspect/rpc/rpc.go View File

@ -66,10 +66,9 @@ func Handler(rpcConfig *config.RPCConfig, routes core.RoutesMap, logger log.Logg
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
} }
} }
wm := server.NewWebsocketManager(routes,
wm := server.NewWebsocketManager(logger, routes,
server.OnDisconnect(websocketDisconnectFn), server.OnDisconnect(websocketDisconnectFn),
server.ReadLimit(rpcConfig.MaxBodyBytes)) server.ReadLimit(rpcConfig.MaxBodyBytes))
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler) mux.HandleFunc("/websocket", wm.WebsocketHandler)
server.RegisterRPCFuncs(mux, routes, logger) server.RegisterRPCFuncs(mux, routes, logger)


+ 1
- 1
internal/proxy/client.go View File

@ -21,7 +21,7 @@ func DefaultClientCreator(logger log.Logger, addr, transport, dbDir string) (abc
case "kvstore": case "kvstore":
return abciclient.NewLocalCreator(kvstore.NewApplication()), noopCloser{} return abciclient.NewLocalCreator(kvstore.NewApplication()), noopCloser{}
case "persistent_kvstore": case "persistent_kvstore":
app := kvstore.NewPersistentKVStoreApplication(dbDir)
app := kvstore.NewPersistentKVStoreApplication(logger, dbDir)
return abciclient.NewLocalCreator(app), app return abciclient.NewLocalCreator(app), app
case "e2e": case "e2e":
app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir)) app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir))


+ 2
- 2
light/proxy/proxy.go View File

@ -95,7 +95,7 @@ func (p *Proxy) listen(ctx context.Context) (net.Listener, *http.ServeMux, error
// 2) Allow websocket connections. // 2) Allow websocket connections.
wmLogger := p.Logger.With("protocol", "websocket") wmLogger := p.Logger.With("protocol", "websocket")
wm := rpcserver.NewWebsocketManager(r,
wm := rpcserver.NewWebsocketManager(wmLogger, r,
rpcserver.OnDisconnect(func(remoteAddr string) { rpcserver.OnDisconnect(func(remoteAddr string) {
err := p.Client.UnsubscribeAll(context.Background(), remoteAddr) err := p.Client.UnsubscribeAll(context.Background(), remoteAddr)
if err != nil && err != tmpubsub.ErrSubscriptionNotFound { if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
@ -104,7 +104,7 @@ func (p *Proxy) listen(ctx context.Context) (net.Listener, *http.ServeMux, error
}), }),
rpcserver.ReadLimit(p.Config.MaxBodyBytes), rpcserver.ReadLimit(p.Config.MaxBodyBytes),
) )
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler) mux.HandleFunc("/websocket", wm.WebsocketHandler)
// 3) Start a client. // 3) Start a client.


+ 1
- 2
node/node.go View File

@ -666,7 +666,7 @@ func (n *nodeImpl) startRPC(ctx context.Context) ([]net.Listener, error) {
mux := http.NewServeMux() mux := http.NewServeMux()
rpcLogger := n.logger.With("module", "rpc-server") rpcLogger := n.logger.With("module", "rpc-server")
wmLogger := rpcLogger.With("protocol", "websocket") wmLogger := rpcLogger.With("protocol", "websocket")
wm := rpcserver.NewWebsocketManager(routes,
wm := rpcserver.NewWebsocketManager(wmLogger, routes,
rpcserver.OnDisconnect(func(remoteAddr string) { rpcserver.OnDisconnect(func(remoteAddr string) {
err := n.rpcEnv.EventBus.UnsubscribeAll(context.Background(), remoteAddr) err := n.rpcEnv.EventBus.UnsubscribeAll(context.Background(), remoteAddr)
if err != nil && err != tmpubsub.ErrSubscriptionNotFound { if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
@ -675,7 +675,6 @@ func (n *nodeImpl) startRPC(ctx context.Context) ([]net.Listener, error) {
}), }),
rpcserver.ReadLimit(cfg.MaxBodyBytes), rpcserver.ReadLimit(cfg.MaxBodyBytes),
) )
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler) mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger) rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger)
listener, err := rpcserver.Listen( listener, err := rpcserver.Listen(


+ 2
- 7
rpc/client/local/local.go View File

@ -51,25 +51,20 @@ type NodeService interface {
} }
// New configures a client that calls the Node directly. // New configures a client that calls the Node directly.
func New(node NodeService) (*Local, error) {
func New(logger log.Logger, node NodeService) (*Local, error) {
env := node.RPCEnvironment() env := node.RPCEnvironment()
if env == nil { if env == nil {
return nil, errors.New("rpc is nil") return nil, errors.New("rpc is nil")
} }
return &Local{ return &Local{
EventBus: node.EventBus(), EventBus: node.EventBus(),
Logger: log.NewNopLogger(),
Logger: logger,
env: env, env: env,
}, nil }, nil
} }
var _ rpcclient.Client = (*Local)(nil) var _ rpcclient.Client = (*Local)(nil)
// SetLogger allows to set a logger on the client.
func (c *Local) SetLogger(l log.Logger) {
c.Logger = l
}
func (c *Local) Status(ctx context.Context) (*coretypes.ResultStatus, error) { func (c *Local) Status(ctx context.Context) (*coretypes.ResultStatus, error) {
return c.env.Status(ctx) return c.env.Status(ctx)
} }


+ 3
- 2
rpc/client/main_test.go View File

@ -10,11 +10,12 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/libs/service"
rpctest "github.com/tendermint/tendermint/rpc/test" rpctest "github.com/tendermint/tendermint/rpc/test"
) )
func NodeSuite(t *testing.T) (service.Service, *config.Config) {
func NodeSuite(t *testing.T, logger log.Logger) (service.Service, *config.Config) {
t.Helper() t.Helper()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -26,7 +27,7 @@ func NodeSuite(t *testing.T) (service.Service, *config.Config) {
dir, err := os.MkdirTemp("/tmp", fmt.Sprint("rpc-client-test-", t.Name())) dir, err := os.MkdirTemp("/tmp", fmt.Sprint("rpc-client-test-", t.Name()))
require.NoError(t, err) require.NoError(t, err)
app := kvstore.NewPersistentKVStoreApplication(dir)
app := kvstore.NewPersistentKVStoreApplication(logger, dir)
node, closer, err := rpctest.StartTendermint(ctx, conf, app, rpctest.SuppressStdout) node, closer, err := rpctest.StartTendermint(ctx, conf, app, rpctest.SuppressStdout)
require.NoError(t, err) require.NoError(t, err)


+ 41
- 18
rpc/client/rpc_test.go View File

@ -34,14 +34,14 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
func getHTTPClient(t *testing.T, conf *config.Config) *rpchttp.HTTP {
func getHTTPClient(t *testing.T, logger log.Logger, conf *config.Config) *rpchttp.HTTP {
t.Helper() t.Helper()
rpcAddr := conf.RPC.ListenAddress rpcAddr := conf.RPC.ListenAddress
c, err := rpchttp.NewWithClient(rpcAddr, http.DefaultClient) c, err := rpchttp.NewWithClient(rpcAddr, http.DefaultClient)
require.NoError(t, err) require.NoError(t, err)
c.Logger = log.NewTestingLogger(t)
c.Logger = logger
t.Cleanup(func() { t.Cleanup(func() {
if c.IsRunning() { if c.IsRunning() {
require.NoError(t, c.Stop()) require.NoError(t, c.Stop())
@ -51,7 +51,7 @@ func getHTTPClient(t *testing.T, conf *config.Config) *rpchttp.HTTP {
return c return c
} }
func getHTTPClientWithTimeout(t *testing.T, conf *config.Config, timeout time.Duration) *rpchttp.HTTP {
func getHTTPClientWithTimeout(t *testing.T, logger log.Logger, conf *config.Config, timeout time.Duration) *rpchttp.HTTP {
t.Helper() t.Helper()
rpcAddr := conf.RPC.ListenAddress rpcAddr := conf.RPC.ListenAddress
@ -60,7 +60,7 @@ func getHTTPClientWithTimeout(t *testing.T, conf *config.Config, timeout time.Du
c, err := rpchttp.NewWithClient(rpcAddr, http.DefaultClient) c, err := rpchttp.NewWithClient(rpcAddr, http.DefaultClient)
require.NoError(t, err) require.NoError(t, err)
c.Logger = log.NewTestingLogger(t)
c.Logger = logger
t.Cleanup(func() { t.Cleanup(func() {
http.DefaultClient.Timeout = 0 http.DefaultClient.Timeout = 0
if c.IsRunning() { if c.IsRunning() {
@ -78,12 +78,13 @@ func GetClients(t *testing.T, ns service.Service, conf *config.Config) []client.
node, ok := ns.(rpclocal.NodeService) node, ok := ns.(rpclocal.NodeService)
require.True(t, ok) require.True(t, ok)
ncl, err := rpclocal.New(node)
logger := log.NewTestingLogger(t)
ncl, err := rpclocal.New(logger, node)
require.NoError(t, err) require.NoError(t, err)
return []client.Client{ return []client.Client{
ncl, ncl,
getHTTPClient(t, conf),
getHTTPClient(t, logger, conf),
} }
} }
@ -91,7 +92,9 @@ func TestClientOperations(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
_, conf := NodeSuite(t)
logger := log.NewTestingLogger(t)
_, conf := NodeSuite(t, logger)
t.Run("NilCustomHTTPClient", func(t *testing.T) { t.Run("NilCustomHTTPClient", func(t *testing.T) {
_, err := rpchttp.NewWithClient("http://example.com", nil) _, err := rpchttp.NewWithClient("http://example.com", nil)
@ -129,14 +132,16 @@ func TestClientOperations(t *testing.T) {
}) })
t.Run("Batching", func(t *testing.T) { t.Run("Batching", func(t *testing.T) {
t.Run("JSONRPCCalls", func(t *testing.T) { t.Run("JSONRPCCalls", func(t *testing.T) {
c := getHTTPClient(t, conf)
logger := log.NewTestingLogger(t)
c := getHTTPClient(t, logger, conf)
testBatchedJSONRPCCalls(ctx, t, c) testBatchedJSONRPCCalls(ctx, t, c)
}) })
t.Run("JSONRPCCallsCancellation", func(t *testing.T) { t.Run("JSONRPCCallsCancellation", func(t *testing.T) {
_, _, tx1 := MakeTxKV() _, _, tx1 := MakeTxKV()
_, _, tx2 := MakeTxKV() _, _, tx2 := MakeTxKV()
c := getHTTPClient(t, conf)
logger := log.NewTestingLogger(t)
c := getHTTPClient(t, logger, conf)
batch := c.NewBatch() batch := c.NewBatch()
_, err := batch.BroadcastTxCommit(ctx, tx1) _, err := batch.BroadcastTxCommit(ctx, tx1)
require.NoError(t, err) require.NoError(t, err)
@ -150,19 +155,25 @@ func TestClientOperations(t *testing.T) {
require.Equal(t, 0, batch.Count()) require.Equal(t, 0, batch.Count())
}) })
t.Run("SendingEmptyRequest", func(t *testing.T) { t.Run("SendingEmptyRequest", func(t *testing.T) {
c := getHTTPClient(t, conf)
logger := log.NewTestingLogger(t)
c := getHTTPClient(t, logger, conf)
batch := c.NewBatch() batch := c.NewBatch()
_, err := batch.Send(ctx) _, err := batch.Send(ctx)
require.Error(t, err, "sending an empty batch of JSON RPC requests should result in an error") require.Error(t, err, "sending an empty batch of JSON RPC requests should result in an error")
}) })
t.Run("ClearingEmptyRequest", func(t *testing.T) { t.Run("ClearingEmptyRequest", func(t *testing.T) {
c := getHTTPClient(t, conf)
logger := log.NewTestingLogger(t)
c := getHTTPClient(t, logger, conf)
batch := c.NewBatch() batch := c.NewBatch()
require.Zero(t, batch.Clear(), "clearing an empty batch of JSON RPC requests should result in a 0 result") require.Zero(t, batch.Clear(), "clearing an empty batch of JSON RPC requests should result in a 0 result")
}) })
t.Run("ConcurrentJSONRPC", func(t *testing.T) { t.Run("ConcurrentJSONRPC", func(t *testing.T) {
logger := log.NewTestingLogger(t)
var wg sync.WaitGroup var wg sync.WaitGroup
c := getHTTPClient(t, conf)
c := getHTTPClient(t, logger, conf)
for i := 0; i < 50; i++ { for i := 0; i < 50; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
@ -174,7 +185,9 @@ func TestClientOperations(t *testing.T) {
}) })
}) })
t.Run("HTTPReturnsErrorIfClientIsNotRunning", func(t *testing.T) { t.Run("HTTPReturnsErrorIfClientIsNotRunning", func(t *testing.T) {
c := getHTTPClientWithTimeout(t, conf, 100*time.Millisecond)
logger := log.NewTestingLogger(t)
c := getHTTPClientWithTimeout(t, logger, conf, 100*time.Millisecond)
// on Subscribe // on Subscribe
_, err := c.Subscribe(ctx, "TestHeaderEvents", _, err := c.Subscribe(ctx, "TestHeaderEvents",
@ -196,7 +209,9 @@ func TestClientOperations(t *testing.T) {
func TestClientMethodCalls(t *testing.T) { func TestClientMethodCalls(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
n, conf := NodeSuite(t)
logger := log.NewTestingLogger(t)
n, conf := NodeSuite(t, logger)
// for broadcast tx tests // for broadcast tx tests
pool := getMempool(t, n) pool := getMempool(t, n)
@ -591,7 +606,9 @@ func TestClientMethodCallsAdvanced(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
n, conf := NodeSuite(t)
logger := log.NewTestingLogger(t)
n, conf := NodeSuite(t, logger)
pool := getMempool(t, n) pool := getMempool(t, n)
t.Run("UnconfirmedTxs", func(t *testing.T) { t.Run("UnconfirmedTxs", func(t *testing.T) {
@ -654,7 +671,9 @@ func TestClientMethodCallsAdvanced(t *testing.T) {
pool.Flush() pool.Flush()
}) })
t.Run("Tx", func(t *testing.T) { t.Run("Tx", func(t *testing.T) {
c := getHTTPClient(t, conf)
logger := log.NewTestingLogger(t)
c := getHTTPClient(t, logger, conf)
// first we broadcast a tx // first we broadcast a tx
_, _, tx := MakeTxKV() _, _, tx := MakeTxKV()
@ -710,7 +729,9 @@ func TestClientMethodCallsAdvanced(t *testing.T) {
} }
}) })
t.Run("TxSearchWithTimeout", func(t *testing.T) { t.Run("TxSearchWithTimeout", func(t *testing.T) {
timeoutClient := getHTTPClientWithTimeout(t, conf, 10*time.Second)
logger := log.NewTestingLogger(t)
timeoutClient := getHTTPClientWithTimeout(t, logger, conf, 10*time.Second)
_, _, tx := MakeTxKV() _, _, tx := MakeTxKV()
_, err := timeoutClient.BroadcastTxCommit(ctx, tx) _, err := timeoutClient.BroadcastTxCommit(ctx, tx)
@ -723,7 +744,9 @@ func TestClientMethodCallsAdvanced(t *testing.T) {
}) })
t.Run("TxSearch", func(t *testing.T) { t.Run("TxSearch", func(t *testing.T) {
t.Skip("Test Asserts Non-Deterministic Results") t.Skip("Test Asserts Non-Deterministic Results")
c := getHTTPClient(t, conf)
logger := log.NewTestingLogger(t)
c := getHTTPClient(t, logger, conf)
// first we broadcast a few txs // first we broadcast a few txs
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {


+ 6
- 6
rpc/jsonrpc/client/integration_test.go View File

@ -8,6 +8,7 @@ package client
import ( import (
"bytes" "bytes"
"context"
"errors" "errors"
"net" "net"
"regexp" "regexp"
@ -15,34 +16,33 @@ import (
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
) )
func TestWSClientReconnectWithJitter(t *testing.T) { func TestWSClientReconnectWithJitter(t *testing.T) {
n := 8 n := 8
maxReconnectAttempts := 3
var maxReconnectAttempts uint = 3
// Max wait time is ceil(1+0.999) + ceil(2+0.999) + ceil(4+0.999) + ceil(...) = 2 + 3 + 5 = 10s + ... // Max wait time is ceil(1+0.999) + ceil(2+0.999) + ceil(4+0.999) + ceil(...) = 2 + 3 + 5 = 10s + ...
maxSleepTime := time.Second * time.Duration(((1<<uint(maxReconnectAttempts))-1)+maxReconnectAttempts) maxSleepTime := time.Second * time.Duration(((1<<uint(maxReconnectAttempts))-1)+maxReconnectAttempts)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var errNotConnected = errors.New("not connected") var errNotConnected = errors.New("not connected")
clientMap := make(map[int]*WSClient) clientMap := make(map[int]*WSClient)
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
logger := log.NewTMLogger(buf)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
c, err := NewWS("tcp://foo", "/websocket") c, err := NewWS("tcp://foo", "/websocket")
require.NoError(t, err) require.NoError(t, err)
c.Dialer = func(string, string) (net.Conn, error) { c.Dialer = func(string, string) (net.Conn, error) {
return nil, errNotConnected return nil, errNotConnected
} }
c.SetLogger(logger)
c.maxReconnectAttempts = maxReconnectAttempts c.maxReconnectAttempts = maxReconnectAttempts
// Not invoking defer c.Stop() because // Not invoking defer c.Stop() because
// after all the reconnect attempts have been // after all the reconnect attempts have been
// exhausted, c.Stop is implicitly invoked. // exhausted, c.Stop is implicitly invoked.
clientMap[i] = c clientMap[i] = c
// Trigger the reconnect routine that performs exponential backoff. // Trigger the reconnect routine that performs exponential backoff.
go c.reconnect()
go c.reconnect(ctx)
} }
stopCount := 0 stopCount := 0


+ 2
- 4
rpc/jsonrpc/jsonrpc_test.go View File

@ -107,8 +107,7 @@ func setup(ctx context.Context) error {
tcpLogger := logger.With("socket", "tcp") tcpLogger := logger.With("socket", "tcp")
mux := http.NewServeMux() mux := http.NewServeMux()
server.RegisterRPCFuncs(mux, Routes, tcpLogger) server.RegisterRPCFuncs(mux, Routes, tcpLogger)
wm := server.NewWebsocketManager(Routes, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second))
wm.SetLogger(tcpLogger)
wm := server.NewWebsocketManager(tcpLogger, Routes, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second))
mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
config := server.DefaultConfig() config := server.DefaultConfig()
listener1, err := server.Listen(tcpAddr, config.MaxOpenConnections) listener1, err := server.Listen(tcpAddr, config.MaxOpenConnections)
@ -124,8 +123,7 @@ func setup(ctx context.Context) error {
unixLogger := logger.With("socket", "unix") unixLogger := logger.With("socket", "unix")
mux2 := http.NewServeMux() mux2 := http.NewServeMux()
server.RegisterRPCFuncs(mux2, Routes, unixLogger) server.RegisterRPCFuncs(mux2, Routes, unixLogger)
wm = server.NewWebsocketManager(Routes)
wm.SetLogger(unixLogger)
wm = server.NewWebsocketManager(unixLogger, Routes)
mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler) mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
listener2, err := server.Listen(unixAddr, config.MaxOpenConnections) listener2, err := server.Listen(unixAddr, config.MaxOpenConnections)
if err != nil { if err != nil {


+ 2
- 6
rpc/jsonrpc/server/ws_handler.go View File

@ -41,6 +41,7 @@ type WebsocketManager struct {
// NewWebsocketManager returns a new WebsocketManager that passes a map of // NewWebsocketManager returns a new WebsocketManager that passes a map of
// functions, connection options and logger to new WS connections. // functions, connection options and logger to new WS connections.
func NewWebsocketManager( func NewWebsocketManager(
logger log.Logger,
funcMap map[string]*RPCFunc, funcMap map[string]*RPCFunc,
wsConnOptions ...func(*wsConnection), wsConnOptions ...func(*wsConnection),
) *WebsocketManager { ) *WebsocketManager {
@ -60,16 +61,11 @@ func NewWebsocketManager(
return true return true
}, },
}, },
logger: log.NewNopLogger(),
logger: logger,
wsConnOptions: wsConnOptions, wsConnOptions: wsConnOptions,
} }
} }
// SetLogger sets the logger.
func (wm *WebsocketManager) SetLogger(l log.Logger) {
wm.logger = l
}
// WebsocketHandler upgrades the request/response (via http.Hijack) and starts // WebsocketHandler upgrades the request/response (via http.Hijack) and starts
// the wsConnection. // the wsConnection.
func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {


+ 1
- 3
rpc/jsonrpc/server/ws_handler_test.go View File

@ -49,9 +49,7 @@ func newWSServer(t *testing.T, logger log.Logger) *httptest.Server {
funcMap := map[string]*RPCFunc{ funcMap := map[string]*RPCFunc{
"c": NewWSRPCFunc(func(ctx context.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"), "c": NewWSRPCFunc(func(ctx context.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"),
} }
wm := NewWebsocketManager(funcMap)
wm.SetLogger(logger)
wm := NewWebsocketManager(logger, funcMap)
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/websocket", wm.WebsocketHandler) mux.HandleFunc("/websocket", wm.WebsocketHandler)


Loading…
Cancel
Save