From d7606777cf5a8304c41fd8c77951658ff04ef3c5 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Tue, 16 Nov 2021 11:20:56 -0500 Subject: [PATCH] libs/service: pass logger explicitly (#7288) This is a very small change, but removes a method from the `service.Service` interface (a win!) and forces callers to explicitly pass loggers in to objects during construction rather than (later) injecting them. There's not a real need for this kind of lazy construction of loggers, and I think a decent potential for confusion for mutable loggers. The main concern I have is that this changes the constructor API for ABCI clients. I think this is fine, and I suspect that as we plumb contexts through, and make changes to the RPC services there'll be a number of similar sorts of changes to various (quasi) public interfaces, which I think we should welcome. --- CHANGELOG_PENDING.md | 2 + abci/client/client.go | 7 +- abci/client/creators.go | 11 +-- abci/client/grpc_client.go | 5 +- abci/client/socket_client.go | 5 +- abci/client/socket_client_test.go | 20 +++-- abci/cmd/abci-cli/abci-cli.go | 7 +- abci/example/example_test.go | 18 +++-- abci/example/kvstore/kvstore_test.go | 14 ++-- abci/server/grpc_server.go | 5 +- abci/server/server.go | 7 +- abci/server/socket_server.go | 14 +--- abci/tests/client_server_test.go | 7 +- internal/blocksync/pool.go | 10 ++- internal/blocksync/pool_test.go | 9 +-- internal/blocksync/reactor.go | 2 +- internal/blocksync/reactor_test.go | 4 +- internal/consensus/byzantine_test.go | 7 +- internal/consensus/common_test.go | 30 ++++---- internal/consensus/invalid_test.go | 3 +- internal/consensus/mempool_test.go | 16 ++-- internal/consensus/reactor_test.go | 11 +-- internal/consensus/replay.go | 2 +- internal/consensus/replay_file.go | 10 +-- internal/consensus/replay_stubs.go | 5 +- internal/consensus/replay_test.go | 33 ++++---- internal/consensus/state.go | 15 +--- internal/consensus/state_test.go | 75 ++++++++++--------- internal/consensus/ticker.go | 6 +- internal/consensus/wal.go | 9 +-- internal/consensus/wal_generator.go | 9 +-- internal/consensus/wal_test.go | 12 ++- internal/eventbus/event_bus.go | 15 ++-- internal/eventbus/event_bus_test.go | 15 ++-- internal/inspect/inspect.go | 24 +++--- internal/mempool/mempool_test.go | 6 +- internal/p2p/conn/connection.go | 19 ++--- internal/p2p/conn/connection_test.go | 50 ++++++------- internal/p2p/transport_mconn.go | 2 +- internal/p2p/trust/store.go | 5 +- internal/p2p/trust/store_test.go | 24 +++--- internal/proxy/app_conn_test.go | 30 ++++---- internal/proxy/client.go | 5 +- internal/proxy/multi_app_conn.go | 17 +++-- internal/proxy/multi_app_conn_test.go | 10 +-- internal/state/execution_test.go | 19 +++-- internal/state/helpers_test.go | 3 +- internal/state/indexer/indexer_service.go | 9 --- .../state/indexer/indexer_service_test.go | 15 +++- libs/pubsub/pubsub.go | 2 +- libs/pubsub/pubsub_test.go | 12 ++- libs/service/service.go | 8 -- node/node.go | 2 +- node/node_test.go | 6 +- node/setup.go | 11 +-- test/e2e/node/main.go | 2 +- test/fuzz/mempool/checktx.go | 2 +- 57 files changed, 347 insertions(+), 356 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 08f53c6e0..cb1a79cac 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -32,6 +32,8 @@ Special thanks to external contributors on this release: - [blocksync] \#7046 Remove v2 implementation of the blocksync service and recactor, which was disabled in the previous release. (@tychoish) - [p2p] \#7064 Remove WDRR queue implementation. (@tychoish) - [config] \#7169 `WriteConfigFile` now returns an error. (@tychoish) + - [libs/service] \#7288 Remove SetLogger method on `service.Service` interface. (@tychosih) + - Blockchain Protocol diff --git a/abci/client/client.go b/abci/client/client.go index a38c7f81b..1f0017557 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -7,6 +7,7 @@ import ( "github.com/tendermint/tendermint/abci/types" tmsync "github.com/tendermint/tendermint/internal/libs/sync" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" ) @@ -68,12 +69,12 @@ type Client interface { // NewClient returns a new ABCI client of the specified transport type. // It returns an error if the transport is not "socket" or "grpc" -func NewClient(addr, transport string, mustConnect bool) (client Client, err error) { +func NewClient(logger log.Logger, addr, transport string, mustConnect bool) (client Client, err error) { switch transport { case "socket": - client = NewSocketClient(addr, mustConnect) + client = NewSocketClient(logger, addr, mustConnect) case "grpc": - client = NewGRPCClient(addr, mustConnect) + client = NewGRPCClient(logger, addr, mustConnect) default: err = fmt.Errorf("unknown abci transport %s", transport) } diff --git a/abci/client/creators.go b/abci/client/creators.go index e17b15eca..7cabb2e43 100644 --- a/abci/client/creators.go +++ b/abci/client/creators.go @@ -5,17 +5,18 @@ import ( "github.com/tendermint/tendermint/abci/types" tmsync "github.com/tendermint/tendermint/internal/libs/sync" + "github.com/tendermint/tendermint/libs/log" ) // Creator creates new ABCI clients. -type Creator func() (Client, error) +type Creator func(log.Logger) (Client, error) // NewLocalCreator returns a Creator for the given app, // which will be running locally. func NewLocalCreator(app types.Application) Creator { mtx := new(tmsync.Mutex) - return func() (Client, error) { + return func(_ log.Logger) (Client, error) { return NewLocalClient(mtx, app), nil } } @@ -23,9 +24,9 @@ func NewLocalCreator(app types.Application) Creator { // NewRemoteCreator returns a Creator for the given address (e.g. // "192.168.0.1") and transport (e.g. "tcp"). Set mustConnect to true if you // want the client to connect before reporting success. -func NewRemoteCreator(addr, transport string, mustConnect bool) Creator { - return func() (Client, error) { - remoteApp, err := NewClient(addr, transport, mustConnect) +func NewRemoteCreator(logger log.Logger, addr, transport string, mustConnect bool) Creator { + return func(log.Logger) (Client, error) { + remoteApp, err := NewClient(logger, addr, transport, mustConnect) if err != nil { return nil, fmt.Errorf("failed to connect to proxy: %w", err) } diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index f1123fab5..f4cd5f3e9 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -11,6 +11,7 @@ import ( "github.com/tendermint/tendermint/abci/types" tmsync "github.com/tendermint/tendermint/internal/libs/sync" + "github.com/tendermint/tendermint/libs/log" tmnet "github.com/tendermint/tendermint/libs/net" "github.com/tendermint/tendermint/libs/service" ) @@ -42,7 +43,7 @@ var _ Client = (*grpcClient)(nil) // which is expensive, but easy - if you want something better, use the socket // protocol! maybe one day, if people really want it, we use grpc streams, but // hopefully not :D -func NewGRPCClient(addr string, mustConnect bool) Client { +func NewGRPCClient(logger log.Logger, addr string, mustConnect bool) Client { cli := &grpcClient{ addr: addr, mustConnect: mustConnect, @@ -54,7 +55,7 @@ func NewGRPCClient(addr string, mustConnect bool) Client { // gRPC calls while processing a slow callback at the channel head. chReqRes: make(chan *ReqRes, 64), } - cli.BaseService = *service.NewBaseService(nil, "grpcClient", cli) + cli.BaseService = *service.NewBaseService(logger, "grpcClient", cli) return cli } diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 726c554d4..00e981123 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -13,6 +13,7 @@ import ( "github.com/tendermint/tendermint/abci/types" tmsync "github.com/tendermint/tendermint/internal/libs/sync" + "github.com/tendermint/tendermint/libs/log" tmnet "github.com/tendermint/tendermint/libs/net" "github.com/tendermint/tendermint/libs/service" ) @@ -50,7 +51,7 @@ var _ Client = (*socketClient)(nil) // NewSocketClient creates a new socket client, which connects to a given // address. If mustConnect is true, the client will return an error upon start // if it fails to connect. -func NewSocketClient(addr string, mustConnect bool) Client { +func NewSocketClient(logger log.Logger, addr string, mustConnect bool) Client { cli := &socketClient{ reqQueue: make(chan *reqResWithContext, reqQueueSize), mustConnect: mustConnect, @@ -59,7 +60,7 @@ func NewSocketClient(addr string, mustConnect bool) Client { reqSent: list.New(), resCb: nil, } - cli.BaseService = *service.NewBaseService(nil, "socketClient", cli) + cli.BaseService = *service.NewBaseService(logger, "socketClient", cli) return cli } diff --git a/abci/client/socket_client_test.go b/abci/client/socket_client_test.go index 53ba7b672..0e2fdffa3 100644 --- a/abci/client/socket_client_test.go +++ b/abci/client/socket_client_test.go @@ -14,6 +14,7 @@ import ( abciclient "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/abci/server" "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" ) @@ -21,8 +22,9 @@ var ctx = context.Background() func TestProperSyncCalls(t *testing.T) { app := slowApp{} + logger := log.TestingLogger() - s, c := setupClientServer(t, app) + s, c := setupClientServer(t, logger, app) t.Cleanup(func() { if err := s.Stop(); err != nil { t.Error(err) @@ -57,8 +59,9 @@ func TestProperSyncCalls(t *testing.T) { func TestHangingSyncCalls(t *testing.T) { app := slowApp{} + logger := log.TestingLogger() - s, c := setupClientServer(t, app) + s, c := setupClientServer(t, logger, app) t.Cleanup(func() { if err := s.Stop(); err != nil { t.Log(err) @@ -99,18 +102,23 @@ func TestHangingSyncCalls(t *testing.T) { } } -func setupClientServer(t *testing.T, app types.Application) ( - service.Service, abciclient.Client) { +func setupClientServer( + t *testing.T, + logger log.Logger, + app types.Application, +) (service.Service, abciclient.Client) { + t.Helper() + // some port between 20k and 30k port := 20000 + rand.Int31()%10000 addr := fmt.Sprintf("localhost:%d", port) - s, err := server.NewServer(addr, "socket", app) + s, err := server.NewServer(logger, addr, "socket", app) require.NoError(t, err) err = s.Start() require.NoError(t, err) - c := abciclient.NewSocketClient(addr, true) + c := abciclient.NewSocketClient(logger, addr, true) err = c.Start() require.NoError(t, err) diff --git a/abci/cmd/abci-cli/abci-cli.go b/abci/cmd/abci-cli/abci-cli.go index 9fae6fc05..3ffc7dbfa 100644 --- a/abci/cmd/abci-cli/abci-cli.go +++ b/abci/cmd/abci-cli/abci-cli.go @@ -67,11 +67,10 @@ var RootCmd = &cobra.Command{ if client == nil { var err error - client, err = abciclient.NewClient(flagAddress, flagAbci, false) + client, err = abciclient.NewClient(logger.With("module", "abci-client"), flagAddress, flagAbci, false) if err != nil { return err } - client.SetLogger(logger.With("module", "abci-client")) if err := client.Start(); err != nil { return err } @@ -586,11 +585,11 @@ func cmdKVStore(cmd *cobra.Command, args []string) error { } // Start the listener - srv, err := server.NewServer(flagAddress, flagAbci, app) + srv, err := server.NewServer(logger.With("module", "abci-server"), flagAddress, flagAbci, app) if err != nil { return err } - srv.SetLogger(logger.With("module", "abci-server")) + if err := srv.Start(); err != nil { return err } diff --git a/abci/example/example_test.go b/abci/example/example_test.go index 64705d0e0..c984fa2fb 100644 --- a/abci/example/example_test.go +++ b/abci/example/example_test.go @@ -44,14 +44,16 @@ func TestGRPC(t *testing.T) { } func testStream(t *testing.T, app types.Application) { + t.Helper() + const numDeliverTxs = 20000 socketFile := fmt.Sprintf("test-%08x.sock", rand.Int31n(1<<30)) defer os.Remove(socketFile) socket := fmt.Sprintf("unix://%v", socketFile) - + logger := log.TestingLogger() // Start the listener - server := abciserver.NewSocketServer(socket, app) - server.SetLogger(log.TestingLogger().With("module", "abci-server")) + server := abciserver.NewSocketServer(logger.With("module", "abci-server"), socket, app) + err := server.Start() require.NoError(t, err) t.Cleanup(func() { @@ -61,8 +63,8 @@ func testStream(t *testing.T, app types.Application) { }) // Connect to the socket - client := abciclient.NewSocketClient(socket, false) - client.SetLogger(log.TestingLogger().With("module", "abci-client")) + client := abciclient.NewSocketClient(log.TestingLogger().With("module", "abci-client"), socket, false) + err = client.Start() require.NoError(t, err) t.Cleanup(func() { @@ -132,10 +134,10 @@ func testGRPCSync(t *testing.T, app types.ABCIApplicationServer) { socketFile := fmt.Sprintf("/tmp/test-%08x.sock", rand.Int31n(1<<30)) defer os.Remove(socketFile) socket := fmt.Sprintf("unix://%v", socketFile) - + logger := log.TestingLogger() // Start the listener - server := abciserver.NewGRPCServer(socket, app) - server.SetLogger(log.TestingLogger().With("module", "abci-server")) + server := abciserver.NewGRPCServer(logger.With("module", "abci-server"), socket, app) + if err := server.Start(); err != nil { t.Fatalf("Error starting GRPC server: %v", err.Error()) } diff --git a/abci/example/kvstore/kvstore_test.go b/abci/example/kvstore/kvstore_test.go index 7c89db1ad..e64e0ed9e 100644 --- a/abci/example/kvstore/kvstore_test.go +++ b/abci/example/kvstore/kvstore_test.go @@ -234,15 +234,13 @@ func makeSocketClientServer(app types.Application, name string) (abciclient.Clie socket := fmt.Sprintf("unix://%s.sock", name) logger := log.TestingLogger() - server := abciserver.NewSocketServer(socket, app) - server.SetLogger(logger.With("module", "abci-server")) + server := abciserver.NewSocketServer(logger.With("module", "abci-server"), socket, app) if err := server.Start(); err != nil { return nil, nil, err } // Connect to the socket - client := abciclient.NewSocketClient(socket, false) - client.SetLogger(logger.With("module", "abci-client")) + client := abciclient.NewSocketClient(logger.With("module", "abci-client"), socket, false) if err := client.Start(); err != nil { if err = server.Stop(); err != nil { return nil, nil, err @@ -259,14 +257,14 @@ func makeGRPCClientServer(app types.Application, name string) (abciclient.Client logger := log.TestingLogger() gapp := types.NewGRPCApplication(app) - server := abciserver.NewGRPCServer(socket, gapp) - server.SetLogger(logger.With("module", "abci-server")) + server := abciserver.NewGRPCServer(logger.With("module", "abci-server"), socket, gapp) + if err := server.Start(); err != nil { return nil, nil, err } - client := abciclient.NewGRPCClient(socket, true) - client.SetLogger(logger.With("module", "abci-client")) + client := abciclient.NewGRPCClient(logger.With("module", "abci-client"), socket, true) + if err := client.Start(); err != nil { if err := server.Stop(); err != nil { return nil, nil, err diff --git a/abci/server/grpc_server.go b/abci/server/grpc_server.go index 503f0b64f..6d22f43c1 100644 --- a/abci/server/grpc_server.go +++ b/abci/server/grpc_server.go @@ -6,6 +6,7 @@ import ( "google.golang.org/grpc" "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/log" tmnet "github.com/tendermint/tendermint/libs/net" "github.com/tendermint/tendermint/libs/service" ) @@ -22,7 +23,7 @@ type GRPCServer struct { } // NewGRPCServer returns a new gRPC ABCI server -func NewGRPCServer(protoAddr string, app types.ABCIApplicationServer) service.Service { +func NewGRPCServer(logger log.Logger, protoAddr string, app types.ABCIApplicationServer) service.Service { proto, addr := tmnet.ProtocolAndAddress(protoAddr) s := &GRPCServer{ proto: proto, @@ -30,7 +31,7 @@ func NewGRPCServer(protoAddr string, app types.ABCIApplicationServer) service.Se listener: nil, app: app, } - s.BaseService = *service.NewBaseService(nil, "ABCIServer", s) + s.BaseService = *service.NewBaseService(logger, "ABCIServer", s) return s } diff --git a/abci/server/server.go b/abci/server/server.go index 6dd13ad02..2a6d50fd2 100644 --- a/abci/server/server.go +++ b/abci/server/server.go @@ -12,17 +12,18 @@ import ( "fmt" "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" ) -func NewServer(protoAddr, transport string, app types.Application) (service.Service, error) { +func NewServer(logger log.Logger, protoAddr, transport string, app types.Application) (service.Service, error) { var s service.Service var err error switch transport { case "socket": - s = NewSocketServer(protoAddr, app) + s = NewSocketServer(logger, protoAddr, app) case "grpc": - s = NewGRPCServer(protoAddr, types.NewGRPCApplication(app)) + s = NewGRPCServer(logger, protoAddr, types.NewGRPCApplication(app)) default: err = fmt.Errorf("unknown server type %s", transport) } diff --git a/abci/server/socket_server.go b/abci/server/socket_server.go index 85539645b..b24b38e38 100644 --- a/abci/server/socket_server.go +++ b/abci/server/socket_server.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "net" - "os" "runtime" "github.com/tendermint/tendermint/abci/types" @@ -19,7 +18,6 @@ import ( type SocketServer struct { service.BaseService - isLoggerSet bool proto string addr string @@ -33,7 +31,7 @@ type SocketServer struct { app types.Application } -func NewSocketServer(protoAddr string, app types.Application) service.Service { +func NewSocketServer(logger tmlog.Logger, protoAddr string, app types.Application) service.Service { proto, addr := tmnet.ProtocolAndAddress(protoAddr) s := &SocketServer{ proto: proto, @@ -42,15 +40,10 @@ func NewSocketServer(protoAddr string, app types.Application) service.Service { app: app, conns: make(map[int]net.Conn), } - s.BaseService = *service.NewBaseService(nil, "ABCIServer", s) + s.BaseService = *service.NewBaseService(logger, "ABCIServer", s) return s } -func (s *SocketServer) SetLogger(l tmlog.Logger) { - s.BaseService.SetLogger(l) - s.isLoggerSet = true -} - func (s *SocketServer) OnStart() error { ln, err := net.Listen(s.proto, s.addr) if err != nil { @@ -164,9 +157,6 @@ func (s *SocketServer) handleRequests(closeConn chan error, conn io.Reader, resp buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] err := fmt.Errorf("recovered from panic: %v\n%s", r, buf) - if !s.isLoggerSet { - fmt.Fprintln(os.Stderr, err) - } closeConn <- err s.appMtx.Unlock() } diff --git a/abci/tests/client_server_test.go b/abci/tests/client_server_test.go index 62dc6e07e..6b4750b33 100644 --- a/abci/tests/client_server_test.go +++ b/abci/tests/client_server_test.go @@ -8,6 +8,7 @@ import ( abciclientent "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/abci/example/kvstore" abciserver "github.com/tendermint/tendermint/abci/server" + "github.com/tendermint/tendermint/libs/log" ) func TestClientServerNoAddrPrefix(t *testing.T) { @@ -15,12 +16,14 @@ func TestClientServerNoAddrPrefix(t *testing.T) { transport := "socket" app := kvstore.NewApplication() - server, err := abciserver.NewServer(addr, transport, app) + logger := log.TestingLogger() + + server, err := abciserver.NewServer(logger, addr, transport, app) assert.NoError(t, err, "expected no error on NewServer") err = server.Start() assert.NoError(t, err, "expected no error on server.Start") - client, err := abciclientent.NewClient(addr, transport, true) + client, err := abciclientent.NewClient(logger, addr, transport, true) assert.NoError(t, err, "expected no error on NewClient") err = client.Start() assert.NoError(t, err, "expected no error on client.Start") diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 8117fe2d4..66ed24a79 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -91,7 +91,13 @@ type BlockPool struct { // NewBlockPool returns a new BlockPool with the height equal to start. Block // requests and errors will be sent to requestsCh and errorsCh accordingly. -func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool { +func NewBlockPool( + logger log.Logger, + start int64, + requestsCh chan<- BlockRequest, + errorsCh chan<- peerError, +) *BlockPool { + bp := &BlockPool{ peers: make(map[types.NodeID]*bpPeer), @@ -104,7 +110,7 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p errorsCh: errorsCh, lastSyncRate: 0, } - bp.BaseService = *service.NewBaseService(nil, "BlockPool", bp) + bp.BaseService = *service.NewBaseService(logger, "BlockPool", bp) return bp } diff --git a/internal/blocksync/pool_test.go b/internal/blocksync/pool_test.go index cbe19acbe..b53699f97 100644 --- a/internal/blocksync/pool_test.go +++ b/internal/blocksync/pool_test.go @@ -82,8 +82,7 @@ func TestBlockPoolBasic(t *testing.T) { peers := makePeers(10, start+1, 1000) errorsCh := make(chan peerError, 1000) requestsCh := make(chan BlockRequest, 1000) - pool := NewBlockPool(start, requestsCh, errorsCh) - pool.SetLogger(log.TestingLogger()) + pool := NewBlockPool(log.TestingLogger(), start, requestsCh, errorsCh) err := pool.Start() if err != nil { @@ -142,8 +141,7 @@ func TestBlockPoolTimeout(t *testing.T) { peers := makePeers(10, start+1, 1000) errorsCh := make(chan peerError, 1000) requestsCh := make(chan BlockRequest, 1000) - pool := NewBlockPool(start, requestsCh, errorsCh) - pool.SetLogger(log.TestingLogger()) + pool := NewBlockPool(log.TestingLogger(), start, requestsCh, errorsCh) err := pool.Start() if err != nil { t.Error(err) @@ -210,8 +208,7 @@ func TestBlockPoolRemovePeer(t *testing.T) { requestsCh := make(chan BlockRequest) errorsCh := make(chan peerError) - pool := NewBlockPool(1, requestsCh, errorsCh) - pool.SetLogger(log.TestingLogger()) + pool := NewBlockPool(log.TestingLogger(), 1, requestsCh, errorsCh) err := pool.Start() require.NoError(t, err) t.Cleanup(func() { diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 43c3e83cd..f18ed86b7 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -127,7 +127,7 @@ func NewReactor( initialState: state, blockExec: blockExec, store: store, - pool: NewBlockPool(startHeight, requestsCh, errorsCh), + pool: NewBlockPool(logger, startHeight, requestsCh, errorsCh), consReactor: consReactor, blockSync: tmsync.NewBool(blockSync), requestsCh: requestsCh, diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 78e61020c..2b567aae7 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -97,8 +97,10 @@ func (rts *reactorTestSuite) addNode(t *testing.T, ) { t.Helper() + logger := log.TestingLogger() + rts.nodes = append(rts.nodes, nodeID) - rts.app[nodeID] = proxy.NewAppConns(abciclient.NewLocalCreator(&abci.BaseApplication{}), proxy.NopMetrics()) + rts.app[nodeID] = proxy.NewAppConns(abciclient.NewLocalCreator(&abci.BaseApplication{}), logger, proxy.NopMetrics()) require.NoError(t, rts.app[nodeID].Start()) blockDB := dbm.NewMemDB() diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 59ab56bbb..df11067b9 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -87,20 +87,17 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) - cs.SetLogger(cs.Logger) + cs := NewState(logger, thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) // set private validator pv := privVals[i] cs.SetPrivValidator(pv) - eventBus := eventbus.NewDefault() - eventBus.SetLogger(log.TestingLogger().With("module", "events")) + eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events")) err = eventBus.Start() require.NoError(t, err) cs.SetEventBus(eventBus) cs.SetTimeoutTicker(tickerFunc()) - cs.SetLogger(logger) states[i] = cs }() diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index 8b25a2af7..780ec8804 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -389,25 +389,28 @@ func subscribeToVoter(t *testing.T, cs *State, addr []byte) <-chan tmpubsub.Mess //------------------------------------------------------------------------------- // consensus states -func newState(state sm.State, pv types.PrivValidator, app abci.Application) (*State, error) { +func newState(logger log.Logger, state sm.State, pv types.PrivValidator, app abci.Application) (*State, error) { cfg, err := config.ResetTestRoot("consensus_state_test") if err != nil { return nil, err } - return newStateWithConfig(cfg, state, pv, app), nil + + return newStateWithConfig(logger, cfg, state, pv, app), nil } func newStateWithConfig( + logger log.Logger, thisConfig *config.Config, state sm.State, pv types.PrivValidator, app abci.Application, ) *State { blockStore := store.NewBlockStore(dbm.NewMemDB()) - return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockStore) + return newStateWithConfigAndBlockStore(logger, thisConfig, state, pv, app, blockStore) } func newStateWithConfigAndBlockStore( + logger log.Logger, thisConfig *config.Config, state sm.State, pv types.PrivValidator, @@ -422,7 +425,7 @@ func newStateWithConfigAndBlockStore( // Make Mempool mempool := mempool.NewTxMempool( - log.TestingLogger().With("module", "mempool"), + logger.With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, 0, @@ -441,13 +444,11 @@ func newStateWithConfigAndBlockStore( panic(err) } - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) - cs.SetLogger(log.TestingLogger().With("module", "consensus")) + blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore) + cs := NewState(logger.With("module", "consensus"), thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) cs.SetPrivValidator(pv) - eventBus := eventbus.NewDefault() - eventBus.SetLogger(log.TestingLogger().With("module", "events")) + eventBus := eventbus.NewDefault(logger.With("module", "events")) err := eventBus.Start() if err != nil { panic(err) @@ -468,13 +469,13 @@ func loadPrivValidator(cfg *config.Config) *privval.FilePV { return privValidator } -func randState(cfg *config.Config, nValidators int) (*State, []*validatorStub, error) { +func randState(cfg *config.Config, logger log.Logger, nValidators int) (*State, []*validatorStub, error) { // Get State state, privVals := randGenesisState(cfg, nValidators, false, 10) vss := make([]*validatorStub, nValidators) - cs, err := newState(state, privVals[0], kvstore.NewApplication()) + cs, err := newState(logger, state, privVals[0], kvstore.NewApplication()) if err != nil { return nil, nil, err } @@ -759,9 +760,9 @@ func randConsensusState( vals := types.TM2PB.ValidatorUpdates(state.Validators) app.InitChain(abci.RequestInitChain{Validators: vals}) - css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, blockStore) + l := logger.With("validator", i, "module", "consensus") + css[i] = newStateWithConfigAndBlockStore(l, thisConfig, state, privVals[i], app, blockStore) css[i].SetTimeoutTicker(tickerFunc()) - css[i].SetLogger(logger.With("validator", i, "module", "consensus")) } return css, func() { @@ -829,9 +830,8 @@ func randConsensusNetWithPeers( app.InitChain(abci.RequestInitChain{Validators: vals}) // sm.SaveState(stateDB,state) //height 1's validatorsInfo already saved in LoadStateFromDBOrGenesisDoc above - css[i] = newStateWithConfig(thisConfig, state, privVal, app) + css[i] = newStateWithConfig(logger.With("validator", i, "module", "consensus"), thisConfig, state, privVal, app) css[i].SetTimeoutTicker(tickerFunc()) - css[i].SetLogger(logger.With("validator", i, "module", "consensus")) } return css, genDoc, peer0Config, func() { for _, dir := range configRootDirs { diff --git a/internal/consensus/invalid_test.go b/internal/consensus/invalid_test.go index f06692b77..fae89f227 100644 --- a/internal/consensus/invalid_test.go +++ b/internal/consensus/invalid_test.go @@ -26,8 +26,7 @@ func TestReactorInvalidPrecommit(t *testing.T) { t.Cleanup(cleanup) for i := 0; i < 4; i++ { - ticker := NewTimeoutTicker() - ticker.SetLogger(states[i].Logger) + ticker := NewTimeoutTicker(states[i].Logger) states[i].SetTimeoutTicker(ticker) } diff --git a/internal/consensus/mempool_test.go b/internal/consensus/mempool_test.go index b31e1d901..78f42f993 100644 --- a/internal/consensus/mempool_test.go +++ b/internal/consensus/mempool_test.go @@ -17,6 +17,7 @@ import ( "github.com/tendermint/tendermint/internal/mempool" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/store" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/types" ) @@ -34,7 +35,7 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) { config.Consensus.CreateEmptyBlocks = false state, privVals := randGenesisState(baseConfig, 1, false, 10) - cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication()) + cs := newStateWithConfig(log.TestingLogger(), config, state, privVals[0], NewCounterApplication()) assertMempool(cs.txNotifier).EnableTxsAvailable() height, round := cs.Height, cs.Round newBlockCh := subscribe(t, cs.eventBus, types.EventQueryNewBlock) @@ -57,7 +58,7 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) { config.Consensus.CreateEmptyBlocksInterval = ensureTimeout state, privVals := randGenesisState(baseConfig, 1, false, 10) - cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication()) + cs := newStateWithConfig(log.TestingLogger(), config, state, privVals[0], NewCounterApplication()) assertMempool(cs.txNotifier).EnableTxsAvailable() @@ -78,7 +79,7 @@ func TestMempoolProgressInHigherRound(t *testing.T) { config.Consensus.CreateEmptyBlocks = false state, privVals := randGenesisState(baseConfig, 1, false, 10) - cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication()) + cs := newStateWithConfig(log.TestingLogger(), config, state, privVals[0], NewCounterApplication()) assertMempool(cs.txNotifier).EnableTxsAvailable() height, round := cs.Height, cs.Round newBlockCh := subscribe(t, cs.eventBus, types.EventQueryNewBlock) @@ -124,11 +125,14 @@ func deliverTxsRange(cs *State, start, end int) { func TestMempoolTxConcurrentWithCommit(t *testing.T) { config := configSetup(t) - + logger := log.TestingLogger() state, privVals := randGenesisState(config, 1, false, 10) stateStore := sm.NewStore(dbm.NewMemDB()) blockStore := store.NewBlockStore(dbm.NewMemDB()) - cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockStore) + + cs := newStateWithConfigAndBlockStore( + logger, config, state, privVals[0], NewCounterApplication(), blockStore) + err := stateStore.Save(state) require.NoError(t, err) newBlockHeaderCh := subscribe(t, cs.eventBus, types.EventQueryNewBlockHeader) @@ -155,7 +159,7 @@ func TestMempoolRmBadTx(t *testing.T) { app := NewCounterApplication() stateStore := sm.NewStore(dbm.NewMemDB()) blockStore := store.NewBlockStore(dbm.NewMemDB()) - cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockStore) + cs := newStateWithConfigAndBlockStore(log.TestingLogger(), config, state, privVals[0], app, blockStore) err := stateStore.Save(state) require.NoError(t, err) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 3d5168da7..8fc562b06 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -411,18 +411,15 @@ func TestReactorWithEvidence(t *testing.T) { evpool2 := sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2) - cs.SetLogger(log.TestingLogger().With("module", "consensus")) + cs := NewState(logger.With("validator", i, "module", "consensus"), + thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2) cs.SetPrivValidator(pv) - eventBus := eventbus.NewDefault() - eventBus.SetLogger(log.TestingLogger().With("module", "events")) - err = eventBus.Start() - require.NoError(t, err) + eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events")) + require.NoError(t, eventBus.Start()) cs.SetEventBus(eventBus) cs.SetTimeoutTicker(tickerFunc()) - cs.SetLogger(logger.With("validator", i, "module", "consensus")) states[i] = cs } diff --git a/internal/consensus/replay.go b/internal/consensus/replay.go index 75945d3dc..563e5ca64 100644 --- a/internal/consensus/replay.go +++ b/internal/consensus/replay.go @@ -421,7 +421,7 @@ func (h *Handshaker) ReplayBlocks( if err != nil { return nil, err } - mockApp := newMockProxyApp(appHash, abciResponses) + mockApp := newMockProxyApp(h.logger, appHash, abciResponses) h.logger.Info("Replay last block using mock app") state, err = h.replayBlock(state, storeBlockHeight, mockApp) return state.AppHash, err diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index 1f5e8e375..6f1e64b2a 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -147,7 +147,7 @@ func (pb *playback) replayReset(count int, newStepSub eventbus.Subscription) err } pb.cs.Wait() - newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, + newCS := NewState(pb.cs.Logger, pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool) newCS.SetEventBus(pb.cs.eventBus) newCS.startForReplay() @@ -337,14 +337,14 @@ func newConsensusStateForReplay( } // Create proxyAppConn connection (consensus, mempool, query) - clientCreator, _ := proxy.DefaultClientCreator(cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) - proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics()) + clientCreator, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) + proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) err = proxyApp.Start() if err != nil { return nil, fmt.Errorf("starting proxy app conns: %w", err) } - eventBus := eventbus.NewDefault() + eventBus := eventbus.NewDefault(logger) if err := eventBus.Start(); err != nil { return nil, fmt.Errorf("failed to start event bus: %w", err) } @@ -358,7 +358,7 @@ func newConsensusStateForReplay( mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore) - consensusState := NewState(csConfig, state.Copy(), blockExec, + consensusState := NewState(logger, csConfig, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState.SetEventBus(eventBus) diff --git a/internal/consensus/replay_stubs.go b/internal/consensus/replay_stubs.go index 1235baccb..679aba611 100644 --- a/internal/consensus/replay_stubs.go +++ b/internal/consensus/replay_stubs.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/tendermint/internal/libs/clist" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/proxy" + "github.com/tendermint/tendermint/libs/log" tmstate "github.com/tendermint/tendermint/proto/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -54,12 +55,12 @@ func (emptyMempool) CloseWAL() {} // Useful because we don't want to call Commit() twice for the same block on // the real app. -func newMockProxyApp(appHash []byte, abciResponses *tmstate.ABCIResponses) proxy.AppConnConsensus { +func newMockProxyApp(logger log.Logger, appHash []byte, abciResponses *tmstate.ABCIResponses) proxy.AppConnConsensus { clientCreator := abciclient.NewLocalCreator(&mockProxyApp{ appHash: appHash, abciResponses: abciResponses, }) - cli, _ := clientCreator() + cli, _ := clientCreator(logger) err := cli.Start() if err != nil { panic(err) diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index a91d52b8a..f5f5d1633 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -64,13 +64,13 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *config.Co privValidator := loadPrivValidator(consensusReplayConfig) blockStore := store.NewBlockStore(dbm.NewMemDB()) cs := newStateWithConfigAndBlockStore( + logger, consensusReplayConfig, state, privValidator, kvstore.NewApplication(), blockStore, ) - cs.SetLogger(logger) bytes, _ := os.ReadFile(cs.config.WalFile()) t.Logf("====== WAL: \n\r%X\n", bytes) @@ -164,13 +164,13 @@ LOOP: require.NoError(t, err) privValidator := loadPrivValidator(consensusReplayConfig) cs := newStateWithConfigAndBlockStore( + logger, consensusReplayConfig, state, privValidator, kvstore.NewApplication(), blockStore, ) - cs.SetLogger(logger) // start sending transactions ctx, cancel := context.WithCancel(context.Background()) @@ -639,7 +639,7 @@ func TestMockProxyApp(t *testing.T) { err = proto.Unmarshal(bytes, loadedAbciRes) require.NoError(t, err) - mock := newMockProxyApp([]byte("mock_hash"), loadedAbciRes) + mock := newMockProxyApp(logger, []byte("mock_hash"), loadedAbciRes) abciRes := new(tmstate.ABCIResponses) abciRes.DeliverTxs = make([]*abci.ResponseDeliverTx, len(loadedAbciRes.DeliverTxs)) @@ -696,6 +696,7 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod cfg := sim.Config + logger := log.TestingLogger() if testValidatorsChange { testConfig, err := ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode)) require.NoError(t, err) @@ -719,9 +720,8 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod privVal, err := privval.LoadFilePV(cfg.PrivValidator.KeyFile(), cfg.PrivValidator.StateFile()) require.NoError(t, err) - wal, err := NewWAL(walFile) + wal, err := NewWAL(logger, walFile) require.NoError(t, err) - wal.SetLogger(log.TestingLogger()) err = wal.Start() require.NoError(t, err) t.Cleanup(func() { @@ -742,7 +742,7 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod state := genesisState.Copy() // run the chain through state.ApplyBlock to build up the tendermint state - state = buildTMStateFromChain(cfg, sim.Mempool, sim.Evpool, stateStore, state, chain, nBlocks, mode, store) + state = buildTMStateFromChain(cfg, logger, sim.Mempool, sim.Evpool, stateStore, state, chain, nBlocks, mode, store) latestAppHash := state.AppHash // make a new client creator @@ -754,7 +754,7 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod if nBlocks > 0 { // run nBlocks against a new client to build up the app state. // use a throwaway tendermint state - proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics()) + proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics()) stateDB1 := dbm.NewMemDB() stateStore := sm.NewStore(stateDB1) err := stateStore.Save(genesisState) @@ -773,8 +773,8 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod // now start the app using the handshake - it should sync genDoc, _ := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) - handshaker := NewHandshaker(log.TestingLogger(), stateStore, state, store, eventbus.NopEventBus{}, genDoc) - proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics()) + handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) + proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics()) if err := proxyApp.Start(); err != nil { t.Fatalf("Error starting proxy app connections: %v", err) } @@ -888,6 +888,7 @@ func buildAppStateFromChain( func buildTMStateFromChain( cfg *config.Config, + logger log.Logger, mempool mempool.Mempool, evpool sm.EvidencePool, stateStore sm.Store, @@ -895,14 +896,15 @@ func buildTMStateFromChain( chain []*types.Block, nBlocks int, mode uint, - blockStore *mockBlockStore) sm.State { + blockStore *mockBlockStore, +) sm.State { // run the whole chain against this client to build up the tendermint state kvstoreApp := kvstore.NewPersistentKVStoreApplication( filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode))) defer kvstoreApp.Close() clientCreator := abciclient.NewLocalCreator(kvstoreApp) - proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics()) + proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) if err := proxyApp.Start(); err != nil { panic(err) } @@ -972,7 +974,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { { app := &badApp{numBlocks: 3, allHashesAreWrong: true} clientCreator := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics()) + proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) err := proxyApp.Start() require.NoError(t, err) t.Cleanup(func() { @@ -996,7 +998,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { { app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true} clientCreator := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics()) + proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) err := proxyApp.Start() require.NoError(t, err) t.Cleanup(func() { @@ -1257,8 +1259,9 @@ func TestHandshakeUpdatesValidators(t *testing.T) { genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) require.NoError(t, err) - handshaker := NewHandshaker(log.TestingLogger(), stateStore, state, store, eventbus.NopEventBus{}, genDoc) - proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics()) + logger := log.TestingLogger() + handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) + proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) if err := proxyApp.Start(); err != nil { t.Fatalf("Error starting proxy app connections: %v", err) } diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 6ebc54086..6cef4ac8f 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -153,6 +153,7 @@ type StateOption func(*State) // NewState returns a new State. func NewState( + logger log.Logger, cfg *config.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, @@ -168,7 +169,7 @@ func NewState( txNotifier: txNotifier, peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), - timeoutTicker: NewTimeoutTicker(), + timeoutTicker: NewTimeoutTicker(logger), statsMsgQueue: make(chan msgInfo, msgQueueSize), done: make(chan struct{}), doWALCatchup: true, @@ -193,7 +194,7 @@ func NewState( // NOTE: we do not call scheduleRound0 yet, we do that upon Start() - cs.BaseService = *service.NewBaseService(nil, "State", cs) + cs.BaseService = *service.NewBaseService(logger, "State", cs) for _, option := range options { option(cs) } @@ -201,12 +202,6 @@ func NewState( return cs } -// SetLogger implements Service. -func (cs *State) SetLogger(l log.Logger) { - cs.BaseService.Logger = l - cs.timeoutTicker.SetLogger(l) -} - // SetEventBus sets event bus. func (cs *State) SetEventBus(b *eventbus.EventBus) { cs.eventBus = b @@ -481,14 +476,12 @@ func (cs *State) Wait() { // OpenWAL opens a file to log all consensus messages and timeouts for // deterministic accountability. func (cs *State) OpenWAL(walFile string) (WAL, error) { - wal, err := NewWAL(walFile) + wal, err := NewWAL(cs.Logger.With("wal", walFile), walFile) if err != nil { cs.Logger.Error("failed to open WAL", "file", walFile, "err", err) return nil, err } - wal.SetLogger(cs.Logger.With("wal", walFile)) - if err := wal.Start(); err != nil { cs.Logger.Error("failed to start WAL", "err", err) return nil, err diff --git a/internal/consensus/state_test.go b/internal/consensus/state_test.go index c97acbe43..1d0741b1f 100644 --- a/internal/consensus/state_test.go +++ b/internal/consensus/state_test.go @@ -58,7 +58,7 @@ x * TestHalt1 - if we see +2/3 precommits after timing out into new round, we sh func TestStateProposerSelection0(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) height, round := cs1.Height, cs1.Round @@ -102,7 +102,7 @@ func TestStateProposerSelection0(t *testing.T) { func TestStateProposerSelection2(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) // test needs more work for more than 3 validators + cs1, vss, err := randState(config, log.TestingLogger(), 4) // test needs more work for more than 3 validators require.NoError(t, err) height := cs1.Height @@ -143,7 +143,7 @@ func TestStateProposerSelection2(t *testing.T) { func TestStateEnterProposeNoPrivValidator(t *testing.T) { config := configSetup(t) - cs, _, err := randState(config, 1) + cs, _, err := randState(config, log.TestingLogger(), 1) require.NoError(t, err) cs.SetPrivValidator(nil) height, round := cs.Height, cs.Round @@ -165,7 +165,7 @@ func TestStateEnterProposeNoPrivValidator(t *testing.T) { func TestStateEnterProposeYesPrivValidator(t *testing.T) { config := configSetup(t) - cs, _, err := randState(config, 1) + cs, _, err := randState(config, log.TestingLogger(), 1) require.NoError(t, err) height, round := cs.Height, cs.Round @@ -198,7 +198,7 @@ func TestStateEnterProposeYesPrivValidator(t *testing.T) { func TestStateBadProposal(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 2) + cs1, vss, err := randState(config, log.TestingLogger(), 2) require.NoError(t, err) height, round := cs1.Height, cs1.Round vs2 := vss[1] @@ -259,7 +259,7 @@ func TestStateBadProposal(t *testing.T) { func TestStateOversizedBlock(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 2) + cs1, vss, err := randState(config, log.TestingLogger(), 2) require.NoError(t, err) cs1.state.ConsensusParams.Block.MaxBytes = 2000 height, round := cs1.Height, cs1.Round @@ -323,8 +323,9 @@ func TestStateOversizedBlock(t *testing.T) { // propose, prevote, and precommit a block func TestStateFullRound1(t *testing.T) { config := configSetup(t) + logger := log.TestingLogger() - cs, vss, err := randState(config, 1) + cs, vss, err := randState(config, logger, 1) require.NoError(t, err) height, round := cs.Height, cs.Round @@ -333,8 +334,8 @@ func TestStateFullRound1(t *testing.T) { if err := cs.eventBus.Stop(); err != nil { t.Error(err) } - eventBus := eventbus.NewDefault() - eventBus.SetLogger(log.TestingLogger().With("module", "events")) + eventBus := eventbus.NewDefault(logger.With("module", "events")) + cs.SetEventBus(eventBus) if err := eventBus.Start(); err != nil { t.Error(err) @@ -367,7 +368,7 @@ func TestStateFullRound1(t *testing.T) { func TestStateFullRoundNil(t *testing.T) { config := configSetup(t) - cs, vss, err := randState(config, 1) + cs, vss, err := randState(config, log.TestingLogger(), 1) require.NoError(t, err) height, round := cs.Height, cs.Round @@ -388,7 +389,7 @@ func TestStateFullRoundNil(t *testing.T) { func TestStateFullRound2(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 2) + cs1, vss, err := randState(config, log.TestingLogger(), 2) require.NoError(t, err) vs2 := vss[1] height, round := cs1.Height, cs1.Round @@ -431,7 +432,7 @@ func TestStateFullRound2(t *testing.T) { func TestStateLockNoPOL(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 2) + cs1, vss, err := randState(config, log.TestingLogger(), 2) require.NoError(t, err) vs2 := vss[1] height, round := cs1.Height, cs1.Round @@ -570,7 +571,7 @@ func TestStateLockNoPOL(t *testing.T) { ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.Precommit(round).Nanoseconds()) - cs2, _, err := randState(config, 2) // needed so generated block is different than locked block + cs2, _, err := randState(config, log.TestingLogger(), 2) // needed so generated block is different than locked block require.NoError(t, err) // before we time out into new round, set next proposal block prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1) @@ -622,8 +623,9 @@ func TestStateLockNoPOL(t *testing.T) { // the others prevote a new block hence v1 changes lock and precommits the new block with the others func TestStateLockPOLRelock(t *testing.T) { config := configSetup(t) + logger := log.TestingLogger() - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, logger, 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, cs1.Round @@ -668,7 +670,7 @@ func TestStateLockPOLRelock(t *testing.T) { signAddVotes(config, cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4) // before we timeout to the new round set the new proposal - cs2, err := newState(cs1.state, vs2, kvstore.NewApplication()) + cs2, err := newState(logger, cs1.state, vs2, kvstore.NewApplication()) require.NoError(t, err) prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1) @@ -725,7 +727,7 @@ func TestStateLockPOLRelock(t *testing.T) { func TestStateLockPOLUnlock(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, cs1.Round @@ -819,8 +821,9 @@ func TestStateLockPOLUnlock(t *testing.T) { // prevote and now v1 can lock onto the third block and precommit that func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) { config := configSetup(t) + logger := log.TestingLogger() - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, logger, 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, cs1.Round @@ -861,7 +864,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) { signAddVotes(config, cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4) // before we timeout to the new round set the new proposal - cs2, err := newState(cs1.state, vs2, kvstore.NewApplication()) + cs2, err := newState(logger, cs1.state, vs2, kvstore.NewApplication()) require.NoError(t, err) prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1) if prop == nil || propBlock == nil { @@ -906,7 +909,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) { signAddVotes(config, cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4) // before we timeout to the new round set the new proposal - cs3, err := newState(cs1.state, vs3, kvstore.NewApplication()) + cs3, err := newState(logger, cs1.state, vs3, kvstore.NewApplication()) require.NoError(t, err) prop, propBlock = decideProposal(cs3, vs3, vs3.Height, vs3.Round+1) if prop == nil || propBlock == nil { @@ -951,7 +954,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) { func TestStateLockPOLSafety1(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, cs1.Round @@ -1075,7 +1078,7 @@ func TestStateLockPOLSafety1(t *testing.T) { func TestStateLockPOLSafety2(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, cs1.Round @@ -1175,7 +1178,7 @@ func TestStateLockPOLSafety2(t *testing.T) { func TestProposeValidBlock(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, cs1.Round @@ -1268,7 +1271,7 @@ func TestProposeValidBlock(t *testing.T) { func TestSetValidBlockOnDelayedPrevote(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, cs1.Round @@ -1333,7 +1336,7 @@ func TestSetValidBlockOnDelayedPrevote(t *testing.T) { func TestSetValidBlockOnDelayedProposal(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, cs1.Round @@ -1392,7 +1395,7 @@ func TestSetValidBlockOnDelayedProposal(t *testing.T) { func TestWaitingTimeoutOnNilPolka(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, cs1.Round @@ -1416,7 +1419,7 @@ func TestWaitingTimeoutOnNilPolka(t *testing.T) { func TestWaitingTimeoutProposeOnNewRound(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, cs1.Round @@ -1455,7 +1458,7 @@ func TestWaitingTimeoutProposeOnNewRound(t *testing.T) { func TestRoundSkipOnNilPolkaFromHigherRound(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, cs1.Round @@ -1494,7 +1497,7 @@ func TestRoundSkipOnNilPolkaFromHigherRound(t *testing.T) { func TestWaitTimeoutProposeOnNilPolkaForTheCurrentRound(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, int32(1) @@ -1524,7 +1527,7 @@ func TestWaitTimeoutProposeOnNilPolkaForTheCurrentRound(t *testing.T) { func TestEmitNewValidBlockEventOnCommitWithoutBlock(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, int32(1) @@ -1561,7 +1564,7 @@ func TestEmitNewValidBlockEventOnCommitWithoutBlock(t *testing.T) { func TestCommitFromPreviousRound(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, int32(1) @@ -1618,7 +1621,7 @@ func TestStartNextHeightCorrectlyAfterTimeout(t *testing.T) { config := configSetup(t) config.Consensus.SkipTimeoutCommit = false - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) cs1.txNotifier = &fakeTxNotifier{ch: make(chan struct{})} @@ -1682,7 +1685,7 @@ func TestResetTimeoutPrecommitUponNewHeight(t *testing.T) { config := configSetup(t) config.Consensus.SkipTimeoutCommit = false - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] @@ -1826,7 +1829,7 @@ func TestStateSlashingPrecommits(t *testing.T) { func TestStateHalt1(t *testing.T) { config := configSetup(t) - cs1, vss, err := randState(config, 4) + cs1, vss, err := randState(config, log.TestingLogger(), 4) require.NoError(t, err) vs2, vs3, vs4 := vss[1], vss[2], vss[3] height, round := cs1.Height, cs1.Round @@ -1897,7 +1900,7 @@ func TestStateOutputsBlockPartsStats(t *testing.T) { config := configSetup(t) // create dummy peer - cs, _, err := randState(config, 1) + cs, _, err := randState(config, log.TestingLogger(), 1) require.NoError(t, err) peerID, err := types.NewNodeID("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") require.NoError(t, err) @@ -1943,7 +1946,7 @@ func TestStateOutputsBlockPartsStats(t *testing.T) { func TestStateOutputVoteStats(t *testing.T) { config := configSetup(t) - cs, vss, err := randState(config, 2) + cs, vss, err := randState(config, log.TestingLogger(), 2) require.NoError(t, err) // create dummy peer peerID, err := types.NewNodeID("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") @@ -1980,7 +1983,7 @@ func TestStateOutputVoteStats(t *testing.T) { func TestSignSameVoteTwice(t *testing.T) { config := configSetup(t) - _, vss, err := randState(config, 2) + _, vss, err := randState(config, log.TestingLogger(), 2) require.NoError(t, err) randBytes := tmrand.Bytes(tmhash.Size) diff --git a/internal/consensus/ticker.go b/internal/consensus/ticker.go index fb3571ac8..0226889c9 100644 --- a/internal/consensus/ticker.go +++ b/internal/consensus/ticker.go @@ -19,8 +19,6 @@ type TimeoutTicker interface { Stop() error Chan() <-chan timeoutInfo // on which to receive a timeout ScheduleTimeout(ti timeoutInfo) // reset the timer - - SetLogger(log.Logger) } // timeoutTicker wraps time.Timer, @@ -37,13 +35,13 @@ type timeoutTicker struct { } // NewTimeoutTicker returns a new TimeoutTicker. -func NewTimeoutTicker() TimeoutTicker { +func NewTimeoutTicker(logger log.Logger) TimeoutTicker { tt := &timeoutTicker{ timer: time.NewTimer(0), tickChan: make(chan timeoutInfo, tickTockBufferSize), tockChan: make(chan timeoutInfo, tickTockBufferSize), } - tt.BaseService = *service.NewBaseService(nil, "TimeoutTicker", tt) + tt.BaseService = *service.NewBaseService(logger, "TimeoutTicker", tt) tt.stopTimer() // don't want to fire until the first scheduled timeout return tt } diff --git a/internal/consensus/wal.go b/internal/consensus/wal.go index 0d9efb839..24fef294d 100644 --- a/internal/consensus/wal.go +++ b/internal/consensus/wal.go @@ -88,7 +88,7 @@ var _ WAL = &BaseWAL{} // NewWAL returns a new write-ahead logger based on `baseWAL`, which implements // WAL. It's flushed and synced to disk every 2s and once when stopped. -func NewWAL(walFile string, groupOptions ...func(*auto.Group)) (*BaseWAL, error) { +func NewWAL(logger log.Logger, walFile string, groupOptions ...func(*auto.Group)) (*BaseWAL, error) { err := tmos.EnsureDir(filepath.Dir(walFile), 0700) if err != nil { return nil, fmt.Errorf("failed to ensure WAL directory is in place: %w", err) @@ -103,7 +103,7 @@ func NewWAL(walFile string, groupOptions ...func(*auto.Group)) (*BaseWAL, error) enc: NewWALEncoder(group), flushInterval: walDefaultFlushInterval, } - wal.BaseService = *service.NewBaseService(nil, "baseWAL", wal) + wal.BaseService = *service.NewBaseService(logger, "baseWAL", wal) return wal, nil } @@ -116,11 +116,6 @@ func (wal *BaseWAL) Group() *auto.Group { return wal.group } -func (wal *BaseWAL) SetLogger(l log.Logger) { - wal.BaseService.Logger = l - wal.group.SetLogger(l) -} - func (wal *BaseWAL) OnStart() error { size, err := wal.group.Head.Size() if err != nil { diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index 77c6ce574..20cf0fae2 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -66,8 +66,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { blockStore := store.NewBlockStore(blockStoreDB) - proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), proxy.NopMetrics()) - proxyApp.SetLogger(logger.With("module", "proxy")) + proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics()) if err := proxyApp.Start(); err != nil { return fmt.Errorf("failed to start proxy app connections: %w", err) } @@ -77,8 +76,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { } }) - eventBus := eventbus.NewDefault() - eventBus.SetLogger(logger.With("module", "events")) + eventBus := eventbus.NewDefault(logger.With("module", "events")) if err := eventBus.Start(); err != nil { return fmt.Errorf("failed to start event bus: %w", err) } @@ -90,8 +88,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { mempool := emptyMempool{} evpool := sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) - consensusState := NewState(cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) - consensusState.SetLogger(logger) + consensusState := NewState(logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState.SetEventBus(eventBus) if privValidator != nil && privValidator != (*privval.FilePV)(nil) { consensusState.SetPrivValidator(privValidator) diff --git a/internal/consensus/wal_test.go b/internal/consensus/wal_test.go index 114bb21b1..6c1feb670 100644 --- a/internal/consensus/wal_test.go +++ b/internal/consensus/wal_test.go @@ -25,17 +25,17 @@ const ( func TestWALTruncate(t *testing.T) { walDir := t.TempDir() walFile := filepath.Join(walDir, "wal") + logger := log.TestingLogger() // this magic number 4K can truncate the content when RotateFile. // defaultHeadSizeLimit(10M) is hard to simulate. // this magic number 1 * time.Millisecond make RotateFile check frequently. // defaultGroupCheckDuration(5s) is hard to simulate. - wal, err := NewWAL(walFile, + wal, err := NewWAL(logger, walFile, autofile.GroupHeadSizeLimit(4096), autofile.GroupCheckDuration(1*time.Millisecond), ) require.NoError(t, err) - wal.SetLogger(log.TestingLogger()) err = wal.Start() require.NoError(t, err) t.Cleanup(func() { @@ -105,7 +105,7 @@ func TestWALWrite(t *testing.T) { walDir := t.TempDir() walFile := filepath.Join(walDir, "wal") - wal, err := NewWAL(walFile) + wal, err := NewWAL(log.TestingLogger(), walFile) require.NoError(t, err) err = wal.Start() require.NoError(t, err) @@ -148,9 +148,8 @@ func TestWALSearchForEndHeight(t *testing.T) { } walFile := tempWALWithData(walBody) - wal, err := NewWAL(walFile) + wal, err := NewWAL(log.TestingLogger(), walFile) require.NoError(t, err) - wal.SetLogger(log.TestingLogger()) h := int64(3) gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{}) @@ -170,12 +169,11 @@ func TestWALSearchForEndHeight(t *testing.T) { func TestWALPeriodicSync(t *testing.T) { walDir := t.TempDir() walFile := filepath.Join(walDir, "wal") - wal, err := NewWAL(walFile, autofile.GroupCheckDuration(1*time.Millisecond)) + wal, err := NewWAL(log.TestingLogger(), walFile, autofile.GroupCheckDuration(1*time.Millisecond)) require.NoError(t, err) wal.SetFlushInterval(walTestFlushInterval) - wal.SetLogger(log.TestingLogger()) // Generate some data err = WALGenerateNBlocks(t, wal.Group(), 5) diff --git a/internal/eventbus/event_bus.go b/internal/eventbus/event_bus.go index 263267461..95c8876d4 100644 --- a/internal/eventbus/event_bus.go +++ b/internal/eventbus/event_bus.go @@ -27,18 +27,17 @@ type EventBus struct { } // NewDefault returns a new event bus with default options. -func NewDefault() *EventBus { - pubsub := tmpubsub.NewServer(tmpubsub.BufferCapacity(0)) +func NewDefault(l log.Logger) *EventBus { + logger := l.With("module", "eventbus") + pubsub := tmpubsub.NewServer(tmpubsub.BufferCapacity(0), + func(s *tmpubsub.Server) { + s.Logger = logger + }) b := &EventBus{pubsub: pubsub} - b.BaseService = *service.NewBaseService(nil, "EventBus", b) + b.BaseService = *service.NewBaseService(logger, "EventBus", b) return b } -func (b *EventBus) SetLogger(l log.Logger) { - b.BaseService.SetLogger(l) - b.pubsub.SetLogger(l.With("module", "pubsub")) -} - func (b *EventBus) OnStart() error { return b.pubsub.Start() } diff --git a/internal/eventbus/event_bus_test.go b/internal/eventbus/event_bus_test.go index e280d2bc4..64e6f3344 100644 --- a/internal/eventbus/event_bus_test.go +++ b/internal/eventbus/event_bus_test.go @@ -12,13 +12,14 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/eventbus" + "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmquery "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/types" ) func TestEventBusPublishEventTx(t *testing.T) { - eventBus := eventbus.NewDefault() + eventBus := eventbus.NewDefault(log.TestingLogger()) err := eventBus.Start() require.NoError(t, err) t.Cleanup(func() { @@ -75,7 +76,7 @@ func TestEventBusPublishEventTx(t *testing.T) { } func TestEventBusPublishEventNewBlock(t *testing.T) { - eventBus := eventbus.NewDefault() + eventBus := eventbus.NewDefault(log.TestingLogger()) err := eventBus.Start() require.NoError(t, err) t.Cleanup(func() { @@ -135,7 +136,7 @@ func TestEventBusPublishEventNewBlock(t *testing.T) { } func TestEventBusPublishEventTxDuplicateKeys(t *testing.T) { - eventBus := eventbus.NewDefault() + eventBus := eventbus.NewDefault(log.TestingLogger()) err := eventBus.Start() require.NoError(t, err) t.Cleanup(func() { @@ -242,7 +243,7 @@ func TestEventBusPublishEventTxDuplicateKeys(t *testing.T) { } func TestEventBusPublishEventNewBlockHeader(t *testing.T) { - eventBus := eventbus.NewDefault() + eventBus := eventbus.NewDefault(log.TestingLogger()) err := eventBus.Start() require.NoError(t, err) t.Cleanup(func() { @@ -299,7 +300,7 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) { } func TestEventBusPublishEventNewEvidence(t *testing.T) { - eventBus := eventbus.NewDefault() + eventBus := eventbus.NewDefault(log.TestingLogger()) err := eventBus.Start() require.NoError(t, err) t.Cleanup(func() { @@ -343,7 +344,7 @@ func TestEventBusPublishEventNewEvidence(t *testing.T) { } func TestEventBusPublish(t *testing.T) { - eventBus := eventbus.NewDefault() + eventBus := eventbus.NewDefault(log.TestingLogger()) err := eventBus.Start() require.NoError(t, err) t.Cleanup(func() { @@ -433,7 +434,7 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes // for random* functions mrand.Seed(time.Now().Unix()) - eventBus := eventbus.NewDefault() // set buffer capacity to 0 so we are not testing cache + eventBus := eventbus.NewDefault(log.TestingLogger()) // set buffer capacity to 0 so we are not testing cache err := eventBus.Start() if err != nil { b.Error(err) diff --git a/internal/inspect/inspect.go b/internal/inspect/inspect.go index 66e9c9421..59f0d92d2 100644 --- a/internal/inspect/inspect.go +++ b/internal/inspect/inspect.go @@ -44,20 +44,18 @@ type Inspector struct { /// //nolint:lll func New(cfg *config.RPCConfig, bs state.BlockStore, ss state.Store, es []indexer.EventSink, logger log.Logger) *Inspector { - routes := rpc.Routes(*cfg, ss, bs, es, logger) - eb := eventbus.NewDefault() - eb.SetLogger(logger.With("module", "events")) - is := indexer.NewService(indexer.ServiceArgs{ - Sinks: es, - EventBus: eb, - Logger: logger.With("module", "txindex"), - }) + eb := eventbus.NewDefault(logger.With("module", "events")) + return &Inspector{ - routes: routes, - config: cfg, - logger: logger, - eventBus: eb, - indexerService: is, + routes: rpc.Routes(*cfg, ss, bs, es, logger), + config: cfg, + logger: logger, + eventBus: eb, + indexerService: indexer.NewService(indexer.ServiceArgs{ + Sinks: es, + EventBus: eb, + Logger: logger.With("module", "txindex"), + }), } } diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index 23c8c378e..428204214 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -77,12 +77,12 @@ func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool { app := &application{kvstore.NewApplication()} cc := abciclient.NewLocalCreator(app) + logger := log.TestingLogger() cfg, err := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|")) require.NoError(t, err) cfg.Mempool.CacheSize = cacheSize - - appConnMem, err := cc() + appConnMem, err := cc(logger) require.NoError(t, err) require.NoError(t, appConnMem.Start()) @@ -91,7 +91,7 @@ func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool { require.NoError(t, appConnMem.Stop()) }) - return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...) + return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...) } func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index 4782d7717..a7efe54e3 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -146,12 +146,14 @@ func DefaultMConnConfig() MConnConfig { // NewMConnection wraps net.Conn and creates multiplex connection func NewMConnection( + logger log.Logger, conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, ) *MConnection { return NewMConnectionWithConfig( + logger, conn, chDescs, onReceive, @@ -161,6 +163,7 @@ func NewMConnection( // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config func NewMConnectionWithConfig( + logger log.Logger, conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, @@ -185,6 +188,8 @@ func NewMConnectionWithConfig( created: time.Now(), } + mconn.BaseService = *service.NewBaseService(logger, "MConnection", mconn) + // Create channels var channelsIdx = map[ChannelID]*channel{} var channels = []*channel{} @@ -197,21 +202,12 @@ func NewMConnectionWithConfig( mconn.channels = channels mconn.channelsIdx = channelsIdx - mconn.BaseService = *service.NewBaseService(nil, "MConnection", mconn) - // maxPacketMsgSize() is a bit heavy, so call just once mconn._maxPacketMsgSize = mconn.maxPacketMsgSize() return mconn } -func (c *MConnection) SetLogger(l log.Logger) { - c.BaseService.SetLogger(l) - for _, ch := range c.channels { - ch.SetLogger(l) - } -} - // OnStart implements BaseService func (c *MConnection) OnStart() error { if err := c.BaseService.OnStart(); err != nil { @@ -670,13 +666,10 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *channel { sendQueue: make(chan []byte, desc.SendQueueCapacity), recving: make([]byte, 0, desc.RecvBufferCapacity), maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize, + Logger: conn.Logger, } } -func (ch *channel) SetLogger(l log.Logger) { - ch.Logger = l -} - // Queues message to send to this channel. // Goroutine-safe // Times out (and returns false) after defaultSendTimeout diff --git a/internal/p2p/conn/connection_test.go b/internal/p2p/conn/connection_test.go index 1ed179ad8..5c4cdb156 100644 --- a/internal/p2p/conn/connection_test.go +++ b/internal/p2p/conn/connection_test.go @@ -19,17 +19,18 @@ import ( const maxPingPongPacketSize = 1024 // bytes -func createTestMConnection(conn net.Conn) *MConnection { - onReceive := func(chID ChannelID, msgBytes []byte) { - } - onError := func(r interface{}) { - } - c := createMConnectionWithCallbacks(conn, onReceive, onError) - c.SetLogger(log.TestingLogger()) - return c +func createTestMConnection(logger log.Logger, conn net.Conn) *MConnection { + return createMConnectionWithCallbacks(logger, conn, + // onRecieve + func(chID ChannelID, msgBytes []byte) { + }, + // onError + func(r interface{}) { + }) } func createMConnectionWithCallbacks( + logger log.Logger, conn net.Conn, onReceive func(chID ChannelID, msgBytes []byte), onError func(r interface{}), @@ -38,8 +39,7 @@ func createMConnectionWithCallbacks( cfg.PingInterval = 90 * time.Millisecond cfg.PongTimeout = 45 * time.Millisecond chDescs := []*ChannelDescriptor{{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} - c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg) - c.SetLogger(log.TestingLogger()) + c := NewMConnectionWithConfig(logger, conn, chDescs, onReceive, onError, cfg) return c } @@ -47,7 +47,7 @@ func TestMConnectionSendFlushStop(t *testing.T) { server, client := NetPipe() t.Cleanup(closeAll(t, client, server)) - clientConn := createTestMConnection(client) + clientConn := createTestMConnection(log.TestingLogger(), client) err := clientConn.Start() require.Nil(t, err) t.Cleanup(stopAll(t, clientConn)) @@ -81,7 +81,7 @@ func TestMConnectionSend(t *testing.T) { server, client := NetPipe() t.Cleanup(closeAll(t, client, server)) - mconn := createTestMConnection(client) + mconn := createTestMConnection(log.TestingLogger(), client) err := mconn.Start() require.Nil(t, err) t.Cleanup(stopAll(t, mconn)) @@ -117,12 +117,13 @@ func TestMConnectionReceive(t *testing.T) { onError := func(r interface{}) { errorsCh <- r } - mconn1 := createMConnectionWithCallbacks(client, onReceive, onError) + logger := log.TestingLogger() + mconn1 := createMConnectionWithCallbacks(logger, client, onReceive, onError) err := mconn1.Start() require.Nil(t, err) t.Cleanup(stopAll(t, mconn1)) - mconn2 := createTestMConnection(server) + mconn2 := createTestMConnection(logger, server) err = mconn2.Start() require.Nil(t, err) t.Cleanup(stopAll(t, mconn2)) @@ -152,7 +153,7 @@ func TestMConnectionPongTimeoutResultsInError(t *testing.T) { onError := func(r interface{}) { errorsCh <- r } - mconn := createMConnectionWithCallbacks(client, onReceive, onError) + mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, onReceive, onError) err := mconn.Start() require.Nil(t, err) t.Cleanup(stopAll(t, mconn)) @@ -190,7 +191,7 @@ func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) { onError := func(r interface{}) { errorsCh <- r } - mconn := createMConnectionWithCallbacks(client, onReceive, onError) + mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, onReceive, onError) err := mconn.Start() require.Nil(t, err) t.Cleanup(stopAll(t, mconn)) @@ -244,7 +245,7 @@ func TestMConnectionMultiplePings(t *testing.T) { onError := func(r interface{}) { errorsCh <- r } - mconn := createMConnectionWithCallbacks(client, onReceive, onError) + mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, onReceive, onError) err := mconn.Start() require.Nil(t, err) t.Cleanup(stopAll(t, mconn)) @@ -291,7 +292,7 @@ func TestMConnectionPingPongs(t *testing.T) { onError := func(r interface{}) { errorsCh <- r } - mconn := createMConnectionWithCallbacks(client, onReceive, onError) + mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, onReceive, onError) err := mconn.Start() require.Nil(t, err) t.Cleanup(stopAll(t, mconn)) @@ -348,7 +349,7 @@ func TestMConnectionStopsAndReturnsError(t *testing.T) { onError := func(r interface{}) { errorsCh <- r } - mconn := createMConnectionWithCallbacks(client, onReceive, onError) + mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, onReceive, onError) err := mconn.Start() require.Nil(t, err) t.Cleanup(stopAll(t, mconn)) @@ -379,19 +380,18 @@ func newClientAndServerConnsForReadErrors(t *testing.T, chOnErr chan struct{}) ( {ID: 0x01, Priority: 1, SendQueueCapacity: 1}, {ID: 0x02, Priority: 1, SendQueueCapacity: 1}, } - mconnClient := NewMConnection(client, chDescs, onReceive, onError) - mconnClient.SetLogger(log.TestingLogger().With("module", "client")) + logger := log.TestingLogger() + mconnClient := NewMConnection(logger.With("module", "client"), client, chDescs, onReceive, onError) err := mconnClient.Start() require.Nil(t, err) // create server conn with 1 channel // it fires on chOnErr when there's an error - serverLogger := log.TestingLogger().With("module", "server") + serverLogger := logger.With("module", "server") onError = func(r interface{}) { chOnErr <- struct{}{} } - mconnServer := createMConnectionWithCallbacks(server, onReceive, onError) - mconnServer.SetLogger(serverLogger) + mconnServer := createMConnectionWithCallbacks(serverLogger, server, onReceive, onError) err = mconnServer.Start() require.Nil(t, err) return mconnClient, mconnServer @@ -488,7 +488,7 @@ func TestMConnectionTrySend(t *testing.T) { server, client := NetPipe() t.Cleanup(closeAll(t, client, server)) - mconn := createTestMConnection(client) + mconn := createTestMConnection(log.TestingLogger(), client) err := mconn.Start() require.Nil(t, err) t.Cleanup(stopAll(t, mconn)) diff --git a/internal/p2p/transport_mconn.go b/internal/p2p/transport_mconn.go index 3e0281c39..736f5360a 100644 --- a/internal/p2p/transport_mconn.go +++ b/internal/p2p/transport_mconn.go @@ -336,13 +336,13 @@ func (c *mConnConnection) handshake( } mconn := conn.NewMConnectionWithConfig( + c.logger.With("peer", c.RemoteEndpoint().NodeAddress(peerInfo.NodeID)), secretConn, c.channelDescs, c.onReceive, c.onError, c.mConnConfig, ) - mconn.SetLogger(c.logger.With("peer", c.RemoteEndpoint().NodeAddress(peerInfo.NodeID))) return mconn, peerInfo, secretConn.RemotePubKey(), nil } diff --git a/internal/p2p/trust/store.go b/internal/p2p/trust/store.go index 9f200b9dd..158354e14 100644 --- a/internal/p2p/trust/store.go +++ b/internal/p2p/trust/store.go @@ -11,6 +11,7 @@ import ( dbm "github.com/tendermint/tm-db" tmsync "github.com/tendermint/tendermint/internal/libs/sync" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" ) @@ -38,14 +39,14 @@ type MetricStore struct { // NewTrustMetricStore returns a store that saves data to the DB // and uses the config when creating new trust metrics. // Use Start to to initialize the trust metric store -func NewTrustMetricStore(db dbm.DB, tmc MetricConfig) *MetricStore { +func NewTrustMetricStore(db dbm.DB, tmc MetricConfig, logger log.Logger) *MetricStore { tms := &MetricStore{ peerMetrics: make(map[string]*Metric), db: db, config: tmc, } - tms.BaseService = *service.NewBaseService(nil, "MetricStore", tms) + tms.BaseService = *service.NewBaseService(logger, "MetricStore", tms) return tms } diff --git a/internal/p2p/trust/store_test.go b/internal/p2p/trust/store_test.go index ecf17dc4a..d1420b1dc 100644 --- a/internal/p2p/trust/store_test.go +++ b/internal/p2p/trust/store_test.go @@ -16,17 +16,16 @@ import ( func TestTrustMetricStoreSaveLoad(t *testing.T) { dir := t.TempDir() + logger := log.TestingLogger() historyDB, err := dbm.NewDB("trusthistory", "goleveldb", dir) require.NoError(t, err) // 0 peers saved - store := NewTrustMetricStore(historyDB, DefaultConfig()) - store.SetLogger(log.TestingLogger()) + store := NewTrustMetricStore(historyDB, DefaultConfig(), logger) store.saveToDB() // Load the data from the file - store = NewTrustMetricStore(historyDB, DefaultConfig()) - store.SetLogger(log.TestingLogger()) + store = NewTrustMetricStore(historyDB, DefaultConfig(), logger) err = store.Start() require.NoError(t, err) // Make sure we still have 0 entries @@ -64,8 +63,8 @@ func TestTrustMetricStoreSaveLoad(t *testing.T) { require.NoError(t, err) // Load the data from the DB - store = NewTrustMetricStore(historyDB, DefaultConfig()) - store.SetLogger(log.TestingLogger()) + store = NewTrustMetricStore(historyDB, DefaultConfig(), logger) + err = store.Start() require.NoError(t, err) @@ -88,9 +87,10 @@ func TestTrustMetricStoreConfig(t *testing.T) { IntegralWeight: 0.5, } + logger := log.TestingLogger() // Create a store with custom config - store := NewTrustMetricStore(historyDB, config) - store.SetLogger(log.TestingLogger()) + store := NewTrustMetricStore(historyDB, config, logger) + err = store.Start() require.NoError(t, err) @@ -108,8 +108,8 @@ func TestTrustMetricStoreLookup(t *testing.T) { historyDB, err := dbm.NewDB("", "memdb", "") require.NoError(t, err) - store := NewTrustMetricStore(historyDB, DefaultConfig()) - store.SetLogger(log.TestingLogger()) + store := NewTrustMetricStore(historyDB, DefaultConfig(), log.TestingLogger()) + err = store.Start() require.NoError(t, err) @@ -131,8 +131,8 @@ func TestTrustMetricStorePeerScore(t *testing.T) { historyDB, err := dbm.NewDB("", "memdb", "") require.NoError(t, err) - store := NewTrustMetricStore(historyDB, DefaultConfig()) - store.SetLogger(log.TestingLogger()) + store := NewTrustMetricStore(historyDB, DefaultConfig(), log.TestingLogger()) + err = store.Start() require.NoError(t, err) diff --git a/internal/proxy/app_conn_test.go b/internal/proxy/app_conn_test.go index f1ae7fe1a..4b4abe607 100644 --- a/internal/proxy/app_conn_test.go +++ b/internal/proxy/app_conn_test.go @@ -48,11 +48,11 @@ var SOCKET = "socket" func TestEcho(t *testing.T) { sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) - clientCreator := abciclient.NewRemoteCreator(sockPath, SOCKET, true) + logger := log.TestingLogger() + clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true) // Start server - s := server.NewSocketServer(sockPath, kvstore.NewApplication()) - s.SetLogger(log.TestingLogger().With("module", "abci-server")) + s := server.NewSocketServer(logger.With("module", "abci-server"), sockPath, kvstore.NewApplication()) if err := s.Start(); err != nil { t.Fatalf("Error starting socket server: %v", err.Error()) } @@ -63,11 +63,11 @@ func TestEcho(t *testing.T) { }) // Start client - cli, err := clientCreator() + cli, err := clientCreator(logger.With("module", "abci-client")) if err != nil { t.Fatalf("Error creating ABCI client: %v", err.Error()) } - cli.SetLogger(log.TestingLogger().With("module", "abci-client")) + if err := cli.Start(); err != nil { t.Fatalf("Error starting ABCI client: %v", err.Error()) } @@ -96,11 +96,11 @@ func TestEcho(t *testing.T) { func BenchmarkEcho(b *testing.B) { b.StopTimer() // Initialize sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) - clientCreator := abciclient.NewRemoteCreator(sockPath, SOCKET, true) + logger := log.TestingLogger() + clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true) // Start server - s := server.NewSocketServer(sockPath, kvstore.NewApplication()) - s.SetLogger(log.TestingLogger().With("module", "abci-server")) + s := server.NewSocketServer(logger.With("module", "abci-server"), sockPath, kvstore.NewApplication()) if err := s.Start(); err != nil { b.Fatalf("Error starting socket server: %v", err.Error()) } @@ -111,11 +111,11 @@ func BenchmarkEcho(b *testing.B) { }) // Start client - cli, err := clientCreator() + cli, err := clientCreator(logger.With("module", "abci-client")) if err != nil { b.Fatalf("Error creating ABCI client: %v", err.Error()) } - cli.SetLogger(log.TestingLogger().With("module", "abci-client")) + if err := cli.Start(); err != nil { b.Fatalf("Error starting ABCI client: %v", err.Error()) } @@ -149,11 +149,11 @@ func BenchmarkEcho(b *testing.B) { func TestInfo(t *testing.T) { sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) - clientCreator := abciclient.NewRemoteCreator(sockPath, SOCKET, true) + logger := log.TestingLogger() + clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true) // Start server - s := server.NewSocketServer(sockPath, kvstore.NewApplication()) - s.SetLogger(log.TestingLogger().With("module", "abci-server")) + s := server.NewSocketServer(logger.With("module", "abci-server"), sockPath, kvstore.NewApplication()) if err := s.Start(); err != nil { t.Fatalf("Error starting socket server: %v", err.Error()) } @@ -164,11 +164,11 @@ func TestInfo(t *testing.T) { }) // Start client - cli, err := clientCreator() + cli, err := clientCreator(logger.With("module", "abci-client")) if err != nil { t.Fatalf("Error creating ABCI client: %v", err.Error()) } - cli.SetLogger(log.TestingLogger().With("module", "abci-client")) + if err := cli.Start(); err != nil { t.Fatalf("Error starting ABCI client: %v", err.Error()) } diff --git a/internal/proxy/client.go b/internal/proxy/client.go index ddb9a928d..4e034802e 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -6,6 +6,7 @@ import ( abciclient "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/log" e2e "github.com/tendermint/tendermint/test/e2e/app" ) @@ -15,7 +16,7 @@ import ( // // The Closer is a noop except for persistent_kvstore applications, // which will clean up the store. -func DefaultClientCreator(addr, transport, dbDir string) (abciclient.Creator, io.Closer) { +func DefaultClientCreator(logger log.Logger, addr, transport, dbDir string) (abciclient.Creator, io.Closer) { switch addr { case "kvstore": return abciclient.NewLocalCreator(kvstore.NewApplication()), noopCloser{} @@ -32,7 +33,7 @@ func DefaultClientCreator(addr, transport, dbDir string) (abciclient.Creator, io return abciclient.NewLocalCreator(types.NewBaseApplication()), noopCloser{} default: mustConnect := false // loop retrying - return abciclient.NewRemoteCreator(addr, transport, mustConnect), noopCloser{} + return abciclient.NewRemoteCreator(logger, addr, transport, mustConnect), noopCloser{} } } diff --git a/internal/proxy/multi_app_conn.go b/internal/proxy/multi_app_conn.go index 0bcc64af6..62862d66e 100644 --- a/internal/proxy/multi_app_conn.go +++ b/internal/proxy/multi_app_conn.go @@ -6,7 +6,7 @@ import ( "syscall" abciclient "github.com/tendermint/tendermint/abci/client" - tmlog "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" ) @@ -33,8 +33,8 @@ type AppConns interface { } // NewAppConns calls NewMultiAppConn. -func NewAppConns(clientCreator abciclient.Creator, metrics *Metrics) AppConns { - return NewMultiAppConn(clientCreator, metrics) +func NewAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns { + return NewMultiAppConn(clientCreator, logger, metrics) } // multiAppConn implements AppConns. @@ -60,12 +60,12 @@ type multiAppConn struct { } // NewMultiAppConn makes all necessary abci connections to the application. -func NewMultiAppConn(clientCreator abciclient.Creator, metrics *Metrics) AppConns { +func NewMultiAppConn(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns { multiAppConn := &multiAppConn{ metrics: metrics, clientCreator: clientCreator, } - multiAppConn.BaseService = *service.NewBaseService(nil, "multiAppConn", multiAppConn) + multiAppConn.BaseService = *service.NewBaseService(logger, "multiAppConn", multiAppConn) return multiAppConn } @@ -128,7 +128,7 @@ func (app *multiAppConn) OnStop() { } func (app *multiAppConn) killTMOnClientError() { - killFn := func(conn string, err error, logger tmlog.Logger) { + killFn := func(conn string, err error, logger log.Logger) { logger.Error( fmt.Sprintf("%s connection terminated. Did the application crash? Please restart tendermint", conn), "err", err) @@ -181,11 +181,12 @@ func (app *multiAppConn) stopAllClients() { } func (app *multiAppConn) abciClientFor(conn string) (abciclient.Client, error) { - c, err := app.clientCreator() + c, err := app.clientCreator(app.Logger.With( + "module", "abci-client", + "connection", conn)) if err != nil { return nil, fmt.Errorf("error creating ABCI client (%s connection): %w", conn, err) } - c.SetLogger(app.Logger.With("module", "abci-client", "connection", conn)) if err := c.Start(); err != nil { return nil, fmt.Errorf("error starting ABCI client (%s connection): %w", conn, err) } diff --git a/internal/proxy/multi_app_conn_test.go b/internal/proxy/multi_app_conn_test.go index 25ed692ab..af9c30091 100644 --- a/internal/proxy/multi_app_conn_test.go +++ b/internal/proxy/multi_app_conn_test.go @@ -14,24 +14,24 @@ import ( abciclient "github.com/tendermint/tendermint/abci/client" abcimocks "github.com/tendermint/tendermint/abci/client/mocks" + "github.com/tendermint/tendermint/libs/log" ) func TestAppConns_Start_Stop(t *testing.T) { quitCh := make(<-chan struct{}) clientMock := &abcimocks.Client{} - clientMock.On("SetLogger", mock.Anything).Return().Times(4) clientMock.On("Start").Return(nil).Times(4) clientMock.On("Stop").Return(nil).Times(4) clientMock.On("Quit").Return(quitCh).Times(4) creatorCallCount := 0 - creator := func() (abciclient.Client, error) { + creator := func(logger log.Logger) (abciclient.Client, error) { creatorCallCount++ return clientMock, nil } - appConns := NewAppConns(creator, NopMetrics()) + appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics()) err := appConns.Start() require.NoError(t, err) @@ -68,11 +68,11 @@ func TestAppConns_Failure(t *testing.T) { clientMock.On("Quit").Return(recvQuitCh) clientMock.On("Error").Return(errors.New("EOF")).Once() - creator := func() (abciclient.Client, error) { + creator := func(log.Logger) (abciclient.Client, error) { return clientMock, nil } - appConns := NewAppConns(creator, NopMetrics()) + appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics()) err := appConns.Start() require.NoError(t, err) diff --git a/internal/state/execution_test.go b/internal/state/execution_test.go index fb70668fd..80b1b6e58 100644 --- a/internal/state/execution_test.go +++ b/internal/state/execution_test.go @@ -38,7 +38,8 @@ var ( func TestApplyBlock(t *testing.T) { app := &testApp{} cc := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics()) + logger := log.TestingLogger() + proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) err := proxyApp.Start() require.Nil(t, err) defer proxyApp.Stop() //nolint:errcheck // ignore for tests @@ -46,7 +47,7 @@ func TestApplyBlock(t *testing.T) { state, stateDB, _ := makeState(1, 1) stateStore := sm.NewStore(stateDB) blockStore := store.NewBlockStore(dbm.NewMemDB()) - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), + blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore) block := sf.MakeBlock(state, 1, new(types.Commit)) @@ -63,7 +64,7 @@ func TestApplyBlock(t *testing.T) { func TestBeginBlockValidators(t *testing.T) { app := &testApp{} cc := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics()) + proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics()) err := proxyApp.Start() require.Nil(t, err) defer proxyApp.Stop() //nolint:errcheck // no need to check error again @@ -126,7 +127,7 @@ func TestBeginBlockValidators(t *testing.T) { func TestBeginBlockByzantineValidators(t *testing.T) { app := &testApp{} cc := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics()) + proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics()) err := proxyApp.Start() require.Nil(t, err) defer proxyApp.Stop() //nolint:errcheck // ignore for tests @@ -351,7 +352,8 @@ func TestUpdateValidators(t *testing.T) { func TestEndBlockValidatorUpdates(t *testing.T) { app := &testApp{} cc := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics()) + logger := log.TestingLogger() + proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) err := proxyApp.Start() require.Nil(t, err) defer proxyApp.Stop() //nolint:errcheck // ignore for tests @@ -362,14 +364,14 @@ func TestEndBlockValidatorUpdates(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, - log.TestingLogger(), + logger, proxyApp.Consensus(), mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, ) - eventBus := eventbus.NewDefault() + eventBus := eventbus.NewDefault(logger) err = eventBus.Start() require.NoError(t, err) defer eventBus.Stop() //nolint:errcheck // ignore for tests @@ -420,7 +422,8 @@ func TestEndBlockValidatorUpdates(t *testing.T) { func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { app := &testApp{} cc := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics()) + logger := log.TestingLogger() + proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) err := proxyApp.Start() require.Nil(t, err) defer proxyApp.Stop() //nolint:errcheck // ignore for tests diff --git a/internal/state/helpers_test.go b/internal/state/helpers_test.go index 0cedebb00..821c0757e 100644 --- a/internal/state/helpers_test.go +++ b/internal/state/helpers_test.go @@ -16,6 +16,7 @@ import ( sm "github.com/tendermint/tendermint/internal/state" sf "github.com/tendermint/tendermint/internal/state/test/factory" "github.com/tendermint/tendermint/internal/test/factory" + "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" tmtime "github.com/tendermint/tendermint/libs/time" tmstate "github.com/tendermint/tendermint/proto/tendermint/state" @@ -31,7 +32,7 @@ type paramsChangeTestCase struct { func newTestApp() proxy.AppConns { app := &testApp{} cc := abciclient.NewLocalCreator(app) - return proxy.NewAppConns(cc, proxy.NopMetrics()) + return proxy.NewAppConns(cc, log.NewNopLogger(), proxy.NopMetrics()) } func makeAndCommitGoodBlock( diff --git a/internal/state/indexer/indexer_service.go b/internal/state/indexer/indexer_service.go index 8f69f488b..d5c230e81 100644 --- a/internal/state/indexer/indexer_service.go +++ b/internal/state/indexer/indexer_service.go @@ -41,15 +41,6 @@ func NewService(args ServiceArgs) *Service { return is } -// NewIndexerService returns a new service instance. -// Deprecated: Use NewService instead. -func NewIndexerService(es []EventSink, eventBus *eventbus.EventBus) *Service { - return NewService(ServiceArgs{ - Sinks: es, - EventBus: eventBus, - }) -} - // publish publishes a pubsub message to the service. The service blocks until // the message has been fully processed. func (is *Service) publish(msg pubsub.Message) error { diff --git a/internal/state/indexer/indexer_service_test.go b/internal/state/indexer/indexer_service_test.go index d62ebffac..a986530f0 100644 --- a/internal/state/indexer/indexer_service_test.go +++ b/internal/state/indexer/indexer_service_test.go @@ -38,10 +38,18 @@ var ( dbName = "postgres" ) +// NewIndexerService returns a new service instance. +func NewIndexerService(es []indexer.EventSink, eventBus *eventbus.EventBus) *indexer.Service { + return indexer.NewService(indexer.ServiceArgs{ + Sinks: es, + EventBus: eventBus, + }) +} + func TestIndexerServiceIndexesBlocks(t *testing.T) { + logger := tmlog.TestingLogger() // event bus - eventBus := eventbus.NewDefault() - eventBus.SetLogger(tmlog.TestingLogger()) + eventBus := eventbus.NewDefault(logger) err := eventBus.Start() require.NoError(t, err) t.Cleanup(func() { @@ -62,8 +70,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { assert.True(t, indexer.KVSinkEnabled(eventSinks)) assert.True(t, indexer.IndexingEnabled(eventSinks)) - service := indexer.NewIndexerService(eventSinks, eventBus) - service.SetLogger(tmlog.TestingLogger()) + service := NewIndexerService(eventSinks, eventBus) err = service.Start() require.NoError(t, err) t.Cleanup(func() { diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index edf59bac4..c1224c642 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -136,10 +136,10 @@ type Option func(*Server) // provided, the resulting server's queue is unbuffered. func NewServer(options ...Option) *Server { s := new(Server) + s.BaseService = *service.NewBaseService(nil, "PubSub", s) for _, opt := range options { opt(s) } - s.BaseService = *service.NewBaseService(nil, "PubSub", s) // The queue receives items to be published. s.queue = make(chan item, s.queueCap) diff --git a/libs/pubsub/pubsub_test.go b/libs/pubsub/pubsub_test.go index 357264e8a..8dcf8b3d9 100644 --- a/libs/pubsub/pubsub_test.go +++ b/libs/pubsub/pubsub_test.go @@ -357,8 +357,10 @@ func TestUnsubscribeAll(t *testing.T) { } func TestBufferCapacity(t *testing.T) { - s := pubsub.NewServer(pubsub.BufferCapacity(2)) - s.SetLogger(log.TestingLogger()) + s := pubsub.NewServer(pubsub.BufferCapacity(2), + func(s *pubsub.Server) { + s.Logger = log.TestingLogger() + }) require.Equal(t, 2, s.BufferCapacity()) @@ -376,8 +378,10 @@ func TestBufferCapacity(t *testing.T) { func newTestServer(t testing.TB) *pubsub.Server { t.Helper() - s := pubsub.NewServer() - s.SetLogger(log.TestingLogger()) + s := pubsub.NewServer(func(s *pubsub.Server) { + s.Logger = log.TestingLogger() + }) + require.NoError(t, s.Start()) t.Cleanup(func() { assert.NoError(t, s.Stop()) diff --git a/libs/service/service.go b/libs/service/service.go index 0af243995..88c25d804 100644 --- a/libs/service/service.go +++ b/libs/service/service.go @@ -48,9 +48,6 @@ type Service interface { // String representation of the service String() string - // SetLogger sets a logger. - SetLogger(log.Logger) - // Wait blocks until the service is stopped. Wait() } @@ -122,11 +119,6 @@ func NewBaseService(logger log.Logger, name string, impl Service) *BaseService { } } -// SetLogger implements Service by setting a logger. -func (bs *BaseService) SetLogger(l log.Logger) { - bs.Logger = l -} - // Start implements Service by calling OnStart (if defined). An error will be // returned if the service is already running or stopped. Not to start the // stopped service, you need to call Reset. diff --git a/node/node.go b/node/node.go index 3fe6ea08e..a0b77823b 100644 --- a/node/node.go +++ b/node/node.go @@ -106,7 +106,7 @@ func newDefaultNode(cfg *config.Config, logger log.Logger) (service.Service, err pval = nil } - appClient, _ := proxy.DefaultClientCreator(cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) + appClient, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) return makeNode(cfg, pval, diff --git a/node/node_test.go b/node/node_test.go index 90a585a63..e9c2159ed 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -249,7 +249,7 @@ func TestCreateProposalBlock(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(cfg.RootDir) cc := abciclient.NewLocalCreator(kvstore.NewApplication()) - proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics()) + proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics()) err = proxyApp.Start() require.Nil(t, err) defer proxyApp.Stop() //nolint:errcheck // ignore for tests @@ -343,7 +343,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { defer os.RemoveAll(cfg.RootDir) cc := abciclient.NewLocalCreator(kvstore.NewApplication()) - proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics()) + proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics()) err = proxyApp.Start() require.Nil(t, err) defer proxyApp.Stop() //nolint:errcheck // ignore for tests @@ -407,7 +407,7 @@ func TestMaxProposalBlockSize(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(cfg.RootDir) cc := abciclient.NewLocalCreator(kvstore.NewApplication()) - proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics()) + proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics()) err = proxyApp.Start() require.Nil(t, err) defer proxyApp.Stop() //nolint:errcheck // ignore for tests diff --git a/node/setup.go b/node/setup.go index e254571e3..297ed0265 100644 --- a/node/setup.go +++ b/node/setup.go @@ -90,17 +90,17 @@ func initDBs( // nolint:lll func createAndStartProxyAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) { - proxyApp := proxy.NewAppConns(clientCreator, metrics) - proxyApp.SetLogger(logger.With("module", "proxy")) + proxyApp := proxy.NewAppConns(clientCreator, logger.With("module", "proxy"), metrics) + if err := proxyApp.Start(); err != nil { return nil, fmt.Errorf("error starting proxy app connections: %v", err) } + return proxyApp, nil } func createAndStartEventBus(logger log.Logger) (*eventbus.EventBus, error) { - eventBus := eventbus.NewDefault() - eventBus.SetLogger(logger.With("module", "events")) + eventBus := eventbus.NewDefault(logger.With("module", "events")) if err := eventBus.Start(); err != nil { return nil, err } @@ -309,6 +309,7 @@ func createConsensusReactor( logger = logger.With("module", "consensus") consensusState := consensus.NewState( + logger, cfg.Consensus, state.Copy(), blockExec, @@ -317,7 +318,7 @@ func createConsensusReactor( evidencePool, consensus.StateMetrics(csMetrics), ) - consensusState.SetLogger(logger) + if privValidator != nil && cfg.Mode == config.ModeValidator { consensusState.SetPrivValidator(privValidator) } diff --git a/test/e2e/node/main.go b/test/e2e/node/main.go index b5d9debe9..7a9e67915 100644 --- a/test/e2e/node/main.go +++ b/test/e2e/node/main.go @@ -102,7 +102,7 @@ func startApp(cfg *Config) error { if err != nil { return err } - server, err := server.NewServer(cfg.Listen, cfg.Protocol, app) + server, err := server.NewServer(logger, cfg.Listen, cfg.Protocol, app) if err != nil { return err } diff --git a/test/fuzz/mempool/checktx.go b/test/fuzz/mempool/checktx.go index c0f207546..406e062fd 100644 --- a/test/fuzz/mempool/checktx.go +++ b/test/fuzz/mempool/checktx.go @@ -16,7 +16,7 @@ var getMp func() mempool.Mempool func init() { app := kvstore.NewApplication() cc := abciclient.NewLocalCreator(app) - appConnMem, _ := cc() + appConnMem, _ := cc(log.NewNopLogger()) err := appConnMem.Start() if err != nil { panic(err)