Browse Source

libs/service: pass logger explicitly (#7288)

This is a very small change, but removes a method from the
`service.Service` interface (a win!) and forces callers to explicitly
pass loggers in to objects during construction rather than (later)
injecting them. There's not a real need for this kind of lazy
construction of loggers, and I think a decent potential for confusion
for mutable loggers.

The main concern I have is that this changes the constructor API for
ABCI clients. I think this is fine, and I suspect that as we plumb
contexts through, and make changes to the RPC services there'll be a
number of similar sorts of changes to various (quasi) public
interfaces, which I think we should welcome.
pull/7291/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
d7606777cf
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 347 additions and 356 deletions
  1. +2
    -0
      CHANGELOG_PENDING.md
  2. +4
    -3
      abci/client/client.go
  3. +6
    -5
      abci/client/creators.go
  4. +3
    -2
      abci/client/grpc_client.go
  5. +3
    -2
      abci/client/socket_client.go
  6. +14
    -6
      abci/client/socket_client_test.go
  7. +3
    -4
      abci/cmd/abci-cli/abci-cli.go
  8. +10
    -8
      abci/example/example_test.go
  9. +6
    -8
      abci/example/kvstore/kvstore_test.go
  10. +3
    -2
      abci/server/grpc_server.go
  11. +4
    -3
      abci/server/server.go
  12. +2
    -12
      abci/server/socket_server.go
  13. +5
    -2
      abci/tests/client_server_test.go
  14. +8
    -2
      internal/blocksync/pool.go
  15. +3
    -6
      internal/blocksync/pool_test.go
  16. +1
    -1
      internal/blocksync/reactor.go
  17. +3
    -1
      internal/blocksync/reactor_test.go
  18. +2
    -5
      internal/consensus/byzantine_test.go
  19. +15
    -15
      internal/consensus/common_test.go
  20. +1
    -2
      internal/consensus/invalid_test.go
  21. +10
    -6
      internal/consensus/mempool_test.go
  22. +4
    -7
      internal/consensus/reactor_test.go
  23. +1
    -1
      internal/consensus/replay.go
  24. +5
    -5
      internal/consensus/replay_file.go
  25. +3
    -2
      internal/consensus/replay_stubs.go
  26. +18
    -15
      internal/consensus/replay_test.go
  27. +4
    -11
      internal/consensus/state.go
  28. +39
    -36
      internal/consensus/state_test.go
  29. +2
    -4
      internal/consensus/ticker.go
  30. +2
    -7
      internal/consensus/wal.go
  31. +3
    -6
      internal/consensus/wal_generator.go
  32. +5
    -7
      internal/consensus/wal_test.go
  33. +7
    -8
      internal/eventbus/event_bus.go
  34. +8
    -7
      internal/eventbus/event_bus_test.go
  35. +11
    -13
      internal/inspect/inspect.go
  36. +3
    -3
      internal/mempool/mempool_test.go
  37. +6
    -13
      internal/p2p/conn/connection.go
  38. +25
    -25
      internal/p2p/conn/connection_test.go
  39. +1
    -1
      internal/p2p/transport_mconn.go
  40. +3
    -2
      internal/p2p/trust/store.go
  41. +12
    -12
      internal/p2p/trust/store_test.go
  42. +15
    -15
      internal/proxy/app_conn_test.go
  43. +3
    -2
      internal/proxy/client.go
  44. +9
    -8
      internal/proxy/multi_app_conn.go
  45. +5
    -5
      internal/proxy/multi_app_conn_test.go
  46. +11
    -8
      internal/state/execution_test.go
  47. +2
    -1
      internal/state/helpers_test.go
  48. +0
    -9
      internal/state/indexer/indexer_service.go
  49. +11
    -4
      internal/state/indexer/indexer_service_test.go
  50. +1
    -1
      libs/pubsub/pubsub.go
  51. +8
    -4
      libs/pubsub/pubsub_test.go
  52. +0
    -8
      libs/service/service.go
  53. +1
    -1
      node/node.go
  54. +3
    -3
      node/node_test.go
  55. +6
    -5
      node/setup.go
  56. +1
    -1
      test/e2e/node/main.go
  57. +1
    -1
      test/fuzz/mempool/checktx.go

+ 2
- 0
CHANGELOG_PENDING.md View File

@ -32,6 +32,8 @@ Special thanks to external contributors on this release:
- [blocksync] \#7046 Remove v2 implementation of the blocksync service and recactor, which was disabled in the previous release. (@tychoish)
- [p2p] \#7064 Remove WDRR queue implementation. (@tychoish)
- [config] \#7169 `WriteConfigFile` now returns an error. (@tychoish)
- [libs/service] \#7288 Remove SetLogger method on `service.Service` interface. (@tychosih)
- Blockchain Protocol


+ 4
- 3
abci/client/client.go View File

@ -7,6 +7,7 @@ import (
"github.com/tendermint/tendermint/abci/types"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
)
@ -68,12 +69,12 @@ type Client interface {
// NewClient returns a new ABCI client of the specified transport type.
// It returns an error if the transport is not "socket" or "grpc"
func NewClient(addr, transport string, mustConnect bool) (client Client, err error) {
func NewClient(logger log.Logger, addr, transport string, mustConnect bool) (client Client, err error) {
switch transport {
case "socket":
client = NewSocketClient(addr, mustConnect)
client = NewSocketClient(logger, addr, mustConnect)
case "grpc":
client = NewGRPCClient(addr, mustConnect)
client = NewGRPCClient(logger, addr, mustConnect)
default:
err = fmt.Errorf("unknown abci transport %s", transport)
}


+ 6
- 5
abci/client/creators.go View File

@ -5,17 +5,18 @@ import (
"github.com/tendermint/tendermint/abci/types"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/log"
)
// Creator creates new ABCI clients.
type Creator func() (Client, error)
type Creator func(log.Logger) (Client, error)
// NewLocalCreator returns a Creator for the given app,
// which will be running locally.
func NewLocalCreator(app types.Application) Creator {
mtx := new(tmsync.Mutex)
return func() (Client, error) {
return func(_ log.Logger) (Client, error) {
return NewLocalClient(mtx, app), nil
}
}
@ -23,9 +24,9 @@ func NewLocalCreator(app types.Application) Creator {
// NewRemoteCreator returns a Creator for the given address (e.g.
// "192.168.0.1") and transport (e.g. "tcp"). Set mustConnect to true if you
// want the client to connect before reporting success.
func NewRemoteCreator(addr, transport string, mustConnect bool) Creator {
return func() (Client, error) {
remoteApp, err := NewClient(addr, transport, mustConnect)
func NewRemoteCreator(logger log.Logger, addr, transport string, mustConnect bool) Creator {
return func(log.Logger) (Client, error) {
remoteApp, err := NewClient(logger, addr, transport, mustConnect)
if err != nil {
return nil, fmt.Errorf("failed to connect to proxy: %w", err)
}


+ 3
- 2
abci/client/grpc_client.go View File

@ -11,6 +11,7 @@ import (
"github.com/tendermint/tendermint/abci/types"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/log"
tmnet "github.com/tendermint/tendermint/libs/net"
"github.com/tendermint/tendermint/libs/service"
)
@ -42,7 +43,7 @@ var _ Client = (*grpcClient)(nil)
// which is expensive, but easy - if you want something better, use the socket
// protocol! maybe one day, if people really want it, we use grpc streams, but
// hopefully not :D
func NewGRPCClient(addr string, mustConnect bool) Client {
func NewGRPCClient(logger log.Logger, addr string, mustConnect bool) Client {
cli := &grpcClient{
addr: addr,
mustConnect: mustConnect,
@ -54,7 +55,7 @@ func NewGRPCClient(addr string, mustConnect bool) Client {
// gRPC calls while processing a slow callback at the channel head.
chReqRes: make(chan *ReqRes, 64),
}
cli.BaseService = *service.NewBaseService(nil, "grpcClient", cli)
cli.BaseService = *service.NewBaseService(logger, "grpcClient", cli)
return cli
}


+ 3
- 2
abci/client/socket_client.go View File

@ -13,6 +13,7 @@ import (
"github.com/tendermint/tendermint/abci/types"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/log"
tmnet "github.com/tendermint/tendermint/libs/net"
"github.com/tendermint/tendermint/libs/service"
)
@ -50,7 +51,7 @@ var _ Client = (*socketClient)(nil)
// NewSocketClient creates a new socket client, which connects to a given
// address. If mustConnect is true, the client will return an error upon start
// if it fails to connect.
func NewSocketClient(addr string, mustConnect bool) Client {
func NewSocketClient(logger log.Logger, addr string, mustConnect bool) Client {
cli := &socketClient{
reqQueue: make(chan *reqResWithContext, reqQueueSize),
mustConnect: mustConnect,
@ -59,7 +60,7 @@ func NewSocketClient(addr string, mustConnect bool) Client {
reqSent: list.New(),
resCb: nil,
}
cli.BaseService = *service.NewBaseService(nil, "socketClient", cli)
cli.BaseService = *service.NewBaseService(logger, "socketClient", cli)
return cli
}


+ 14
- 6
abci/client/socket_client_test.go View File

@ -14,6 +14,7 @@ import (
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/server"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
)
@ -21,8 +22,9 @@ var ctx = context.Background()
func TestProperSyncCalls(t *testing.T) {
app := slowApp{}
logger := log.TestingLogger()
s, c := setupClientServer(t, app)
s, c := setupClientServer(t, logger, app)
t.Cleanup(func() {
if err := s.Stop(); err != nil {
t.Error(err)
@ -57,8 +59,9 @@ func TestProperSyncCalls(t *testing.T) {
func TestHangingSyncCalls(t *testing.T) {
app := slowApp{}
logger := log.TestingLogger()
s, c := setupClientServer(t, app)
s, c := setupClientServer(t, logger, app)
t.Cleanup(func() {
if err := s.Stop(); err != nil {
t.Log(err)
@ -99,18 +102,23 @@ func TestHangingSyncCalls(t *testing.T) {
}
}
func setupClientServer(t *testing.T, app types.Application) (
service.Service, abciclient.Client) {
func setupClientServer(
t *testing.T,
logger log.Logger,
app types.Application,
) (service.Service, abciclient.Client) {
t.Helper()
// some port between 20k and 30k
port := 20000 + rand.Int31()%10000
addr := fmt.Sprintf("localhost:%d", port)
s, err := server.NewServer(addr, "socket", app)
s, err := server.NewServer(logger, addr, "socket", app)
require.NoError(t, err)
err = s.Start()
require.NoError(t, err)
c := abciclient.NewSocketClient(addr, true)
c := abciclient.NewSocketClient(logger, addr, true)
err = c.Start()
require.NoError(t, err)


+ 3
- 4
abci/cmd/abci-cli/abci-cli.go View File

@ -67,11 +67,10 @@ var RootCmd = &cobra.Command{
if client == nil {
var err error
client, err = abciclient.NewClient(flagAddress, flagAbci, false)
client, err = abciclient.NewClient(logger.With("module", "abci-client"), flagAddress, flagAbci, false)
if err != nil {
return err
}
client.SetLogger(logger.With("module", "abci-client"))
if err := client.Start(); err != nil {
return err
}
@ -586,11 +585,11 @@ func cmdKVStore(cmd *cobra.Command, args []string) error {
}
// Start the listener
srv, err := server.NewServer(flagAddress, flagAbci, app)
srv, err := server.NewServer(logger.With("module", "abci-server"), flagAddress, flagAbci, app)
if err != nil {
return err
}
srv.SetLogger(logger.With("module", "abci-server"))
if err := srv.Start(); err != nil {
return err
}


+ 10
- 8
abci/example/example_test.go View File

@ -44,14 +44,16 @@ func TestGRPC(t *testing.T) {
}
func testStream(t *testing.T, app types.Application) {
t.Helper()
const numDeliverTxs = 20000
socketFile := fmt.Sprintf("test-%08x.sock", rand.Int31n(1<<30))
defer os.Remove(socketFile)
socket := fmt.Sprintf("unix://%v", socketFile)
logger := log.TestingLogger()
// Start the listener
server := abciserver.NewSocketServer(socket, app)
server.SetLogger(log.TestingLogger().With("module", "abci-server"))
server := abciserver.NewSocketServer(logger.With("module", "abci-server"), socket, app)
err := server.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -61,8 +63,8 @@ func testStream(t *testing.T, app types.Application) {
})
// Connect to the socket
client := abciclient.NewSocketClient(socket, false)
client.SetLogger(log.TestingLogger().With("module", "abci-client"))
client := abciclient.NewSocketClient(log.TestingLogger().With("module", "abci-client"), socket, false)
err = client.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -132,10 +134,10 @@ func testGRPCSync(t *testing.T, app types.ABCIApplicationServer) {
socketFile := fmt.Sprintf("/tmp/test-%08x.sock", rand.Int31n(1<<30))
defer os.Remove(socketFile)
socket := fmt.Sprintf("unix://%v", socketFile)
logger := log.TestingLogger()
// Start the listener
server := abciserver.NewGRPCServer(socket, app)
server.SetLogger(log.TestingLogger().With("module", "abci-server"))
server := abciserver.NewGRPCServer(logger.With("module", "abci-server"), socket, app)
if err := server.Start(); err != nil {
t.Fatalf("Error starting GRPC server: %v", err.Error())
}


+ 6
- 8
abci/example/kvstore/kvstore_test.go View File

@ -234,15 +234,13 @@ func makeSocketClientServer(app types.Application, name string) (abciclient.Clie
socket := fmt.Sprintf("unix://%s.sock", name)
logger := log.TestingLogger()
server := abciserver.NewSocketServer(socket, app)
server.SetLogger(logger.With("module", "abci-server"))
server := abciserver.NewSocketServer(logger.With("module", "abci-server"), socket, app)
if err := server.Start(); err != nil {
return nil, nil, err
}
// Connect to the socket
client := abciclient.NewSocketClient(socket, false)
client.SetLogger(logger.With("module", "abci-client"))
client := abciclient.NewSocketClient(logger.With("module", "abci-client"), socket, false)
if err := client.Start(); err != nil {
if err = server.Stop(); err != nil {
return nil, nil, err
@ -259,14 +257,14 @@ func makeGRPCClientServer(app types.Application, name string) (abciclient.Client
logger := log.TestingLogger()
gapp := types.NewGRPCApplication(app)
server := abciserver.NewGRPCServer(socket, gapp)
server.SetLogger(logger.With("module", "abci-server"))
server := abciserver.NewGRPCServer(logger.With("module", "abci-server"), socket, gapp)
if err := server.Start(); err != nil {
return nil, nil, err
}
client := abciclient.NewGRPCClient(socket, true)
client.SetLogger(logger.With("module", "abci-client"))
client := abciclient.NewGRPCClient(logger.With("module", "abci-client"), socket, true)
if err := client.Start(); err != nil {
if err := server.Stop(); err != nil {
return nil, nil, err


+ 3
- 2
abci/server/grpc_server.go View File

@ -6,6 +6,7 @@ import (
"google.golang.org/grpc"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
tmnet "github.com/tendermint/tendermint/libs/net"
"github.com/tendermint/tendermint/libs/service"
)
@ -22,7 +23,7 @@ type GRPCServer struct {
}
// NewGRPCServer returns a new gRPC ABCI server
func NewGRPCServer(protoAddr string, app types.ABCIApplicationServer) service.Service {
func NewGRPCServer(logger log.Logger, protoAddr string, app types.ABCIApplicationServer) service.Service {
proto, addr := tmnet.ProtocolAndAddress(protoAddr)
s := &GRPCServer{
proto: proto,
@ -30,7 +31,7 @@ func NewGRPCServer(protoAddr string, app types.ABCIApplicationServer) service.Se
listener: nil,
app: app,
}
s.BaseService = *service.NewBaseService(nil, "ABCIServer", s)
s.BaseService = *service.NewBaseService(logger, "ABCIServer", s)
return s
}


+ 4
- 3
abci/server/server.go View File

@ -12,17 +12,18 @@ import (
"fmt"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
)
func NewServer(protoAddr, transport string, app types.Application) (service.Service, error) {
func NewServer(logger log.Logger, protoAddr, transport string, app types.Application) (service.Service, error) {
var s service.Service
var err error
switch transport {
case "socket":
s = NewSocketServer(protoAddr, app)
s = NewSocketServer(logger, protoAddr, app)
case "grpc":
s = NewGRPCServer(protoAddr, types.NewGRPCApplication(app))
s = NewGRPCServer(logger, protoAddr, types.NewGRPCApplication(app))
default:
err = fmt.Errorf("unknown server type %s", transport)
}


+ 2
- 12
abci/server/socket_server.go View File

@ -5,7 +5,6 @@ import (
"fmt"
"io"
"net"
"os"
"runtime"
"github.com/tendermint/tendermint/abci/types"
@ -19,7 +18,6 @@ import (
type SocketServer struct {
service.BaseService
isLoggerSet bool
proto string
addr string
@ -33,7 +31,7 @@ type SocketServer struct {
app types.Application
}
func NewSocketServer(protoAddr string, app types.Application) service.Service {
func NewSocketServer(logger tmlog.Logger, protoAddr string, app types.Application) service.Service {
proto, addr := tmnet.ProtocolAndAddress(protoAddr)
s := &SocketServer{
proto: proto,
@ -42,15 +40,10 @@ func NewSocketServer(protoAddr string, app types.Application) service.Service {
app: app,
conns: make(map[int]net.Conn),
}
s.BaseService = *service.NewBaseService(nil, "ABCIServer", s)
s.BaseService = *service.NewBaseService(logger, "ABCIServer", s)
return s
}
func (s *SocketServer) SetLogger(l tmlog.Logger) {
s.BaseService.SetLogger(l)
s.isLoggerSet = true
}
func (s *SocketServer) OnStart() error {
ln, err := net.Listen(s.proto, s.addr)
if err != nil {
@ -164,9 +157,6 @@ func (s *SocketServer) handleRequests(closeConn chan error, conn io.Reader, resp
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err := fmt.Errorf("recovered from panic: %v\n%s", r, buf)
if !s.isLoggerSet {
fmt.Fprintln(os.Stderr, err)
}
closeConn <- err
s.appMtx.Unlock()
}


+ 5
- 2
abci/tests/client_server_test.go View File

@ -8,6 +8,7 @@ import (
abciclientent "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/example/kvstore"
abciserver "github.com/tendermint/tendermint/abci/server"
"github.com/tendermint/tendermint/libs/log"
)
func TestClientServerNoAddrPrefix(t *testing.T) {
@ -15,12 +16,14 @@ func TestClientServerNoAddrPrefix(t *testing.T) {
transport := "socket"
app := kvstore.NewApplication()
server, err := abciserver.NewServer(addr, transport, app)
logger := log.TestingLogger()
server, err := abciserver.NewServer(logger, addr, transport, app)
assert.NoError(t, err, "expected no error on NewServer")
err = server.Start()
assert.NoError(t, err, "expected no error on server.Start")
client, err := abciclientent.NewClient(addr, transport, true)
client, err := abciclientent.NewClient(logger, addr, transport, true)
assert.NoError(t, err, "expected no error on NewClient")
err = client.Start()
assert.NoError(t, err, "expected no error on client.Start")


+ 8
- 2
internal/blocksync/pool.go View File

@ -91,7 +91,13 @@ type BlockPool struct {
// NewBlockPool returns a new BlockPool with the height equal to start. Block
// requests and errors will be sent to requestsCh and errorsCh accordingly.
func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool {
func NewBlockPool(
logger log.Logger,
start int64,
requestsCh chan<- BlockRequest,
errorsCh chan<- peerError,
) *BlockPool {
bp := &BlockPool{
peers: make(map[types.NodeID]*bpPeer),
@ -104,7 +110,7 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p
errorsCh: errorsCh,
lastSyncRate: 0,
}
bp.BaseService = *service.NewBaseService(nil, "BlockPool", bp)
bp.BaseService = *service.NewBaseService(logger, "BlockPool", bp)
return bp
}


+ 3
- 6
internal/blocksync/pool_test.go View File

@ -82,8 +82,7 @@ func TestBlockPoolBasic(t *testing.T) {
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(start, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())
pool := NewBlockPool(log.TestingLogger(), start, requestsCh, errorsCh)
err := pool.Start()
if err != nil {
@ -142,8 +141,7 @@ func TestBlockPoolTimeout(t *testing.T) {
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(start, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())
pool := NewBlockPool(log.TestingLogger(), start, requestsCh, errorsCh)
err := pool.Start()
if err != nil {
t.Error(err)
@ -210,8 +208,7 @@ func TestBlockPoolRemovePeer(t *testing.T) {
requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)
pool := NewBlockPool(1, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())
pool := NewBlockPool(log.TestingLogger(), 1, requestsCh, errorsCh)
err := pool.Start()
require.NoError(t, err)
t.Cleanup(func() {


+ 1
- 1
internal/blocksync/reactor.go View File

@ -127,7 +127,7 @@ func NewReactor(
initialState: state,
blockExec: blockExec,
store: store,
pool: NewBlockPool(startHeight, requestsCh, errorsCh),
pool: NewBlockPool(logger, startHeight, requestsCh, errorsCh),
consReactor: consReactor,
blockSync: tmsync.NewBool(blockSync),
requestsCh: requestsCh,


+ 3
- 1
internal/blocksync/reactor_test.go View File

@ -97,8 +97,10 @@ func (rts *reactorTestSuite) addNode(t *testing.T,
) {
t.Helper()
logger := log.TestingLogger()
rts.nodes = append(rts.nodes, nodeID)
rts.app[nodeID] = proxy.NewAppConns(abciclient.NewLocalCreator(&abci.BaseApplication{}), proxy.NopMetrics())
rts.app[nodeID] = proxy.NewAppConns(abciclient.NewLocalCreator(&abci.BaseApplication{}), logger, proxy.NopMetrics())
require.NoError(t, rts.app[nodeID].Start())
blockDB := dbm.NewMemDB()


+ 2
- 5
internal/consensus/byzantine_test.go View File

@ -87,20 +87,17 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(cs.Logger)
cs := NewState(logger, thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
// set private validator
pv := privVals[i]
cs.SetPrivValidator(pv)
eventBus := eventbus.NewDefault()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))
err = eventBus.Start()
require.NoError(t, err)
cs.SetEventBus(eventBus)
cs.SetTimeoutTicker(tickerFunc())
cs.SetLogger(logger)
states[i] = cs
}()


+ 15
- 15
internal/consensus/common_test.go View File

@ -389,25 +389,28 @@ func subscribeToVoter(t *testing.T, cs *State, addr []byte) <-chan tmpubsub.Mess
//-------------------------------------------------------------------------------
// consensus states
func newState(state sm.State, pv types.PrivValidator, app abci.Application) (*State, error) {
func newState(logger log.Logger, state sm.State, pv types.PrivValidator, app abci.Application) (*State, error) {
cfg, err := config.ResetTestRoot("consensus_state_test")
if err != nil {
return nil, err
}
return newStateWithConfig(cfg, state, pv, app), nil
return newStateWithConfig(logger, cfg, state, pv, app), nil
}
func newStateWithConfig(
logger log.Logger,
thisConfig *config.Config,
state sm.State,
pv types.PrivValidator,
app abci.Application,
) *State {
blockStore := store.NewBlockStore(dbm.NewMemDB())
return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockStore)
return newStateWithConfigAndBlockStore(logger, thisConfig, state, pv, app, blockStore)
}
func newStateWithConfigAndBlockStore(
logger log.Logger,
thisConfig *config.Config,
state sm.State,
pv types.PrivValidator,
@ -422,7 +425,7 @@ func newStateWithConfigAndBlockStore(
// Make Mempool
mempool := mempool.NewTxMempool(
log.TestingLogger().With("module", "mempool"),
logger.With("module", "mempool"),
thisConfig.Mempool,
proxyAppConnMem,
0,
@ -441,13 +444,11 @@ func newStateWithConfigAndBlockStore(
panic(err)
}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(logger.With("module", "consensus"), thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetPrivValidator(pv)
eventBus := eventbus.NewDefault()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
eventBus := eventbus.NewDefault(logger.With("module", "events"))
err := eventBus.Start()
if err != nil {
panic(err)
@ -468,13 +469,13 @@ func loadPrivValidator(cfg *config.Config) *privval.FilePV {
return privValidator
}
func randState(cfg *config.Config, nValidators int) (*State, []*validatorStub, error) {
func randState(cfg *config.Config, logger log.Logger, nValidators int) (*State, []*validatorStub, error) {
// Get State
state, privVals := randGenesisState(cfg, nValidators, false, 10)
vss := make([]*validatorStub, nValidators)
cs, err := newState(state, privVals[0], kvstore.NewApplication())
cs, err := newState(logger, state, privVals[0], kvstore.NewApplication())
if err != nil {
return nil, nil, err
}
@ -759,9 +760,9 @@ func randConsensusState(
vals := types.TM2PB.ValidatorUpdates(state.Validators)
app.InitChain(abci.RequestInitChain{Validators: vals})
css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, blockStore)
l := logger.With("validator", i, "module", "consensus")
css[i] = newStateWithConfigAndBlockStore(l, thisConfig, state, privVals[i], app, blockStore)
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
}
return css, func() {
@ -829,9 +830,8 @@ func randConsensusNetWithPeers(
app.InitChain(abci.RequestInitChain{Validators: vals})
// sm.SaveState(stateDB,state) //height 1's validatorsInfo already saved in LoadStateFromDBOrGenesisDoc above
css[i] = newStateWithConfig(thisConfig, state, privVal, app)
css[i] = newStateWithConfig(logger.With("validator", i, "module", "consensus"), thisConfig, state, privVal, app)
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
}
return css, genDoc, peer0Config, func() {
for _, dir := range configRootDirs {


+ 1
- 2
internal/consensus/invalid_test.go View File

@ -26,8 +26,7 @@ func TestReactorInvalidPrecommit(t *testing.T) {
t.Cleanup(cleanup)
for i := 0; i < 4; i++ {
ticker := NewTimeoutTicker()
ticker.SetLogger(states[i].Logger)
ticker := NewTimeoutTicker(states[i].Logger)
states[i].SetTimeoutTicker(ticker)
}


+ 10
- 6
internal/consensus/mempool_test.go View File

@ -17,6 +17,7 @@ import (
"github.com/tendermint/tendermint/internal/mempool"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
@ -34,7 +35,7 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {
config.Consensus.CreateEmptyBlocks = false
state, privVals := randGenesisState(baseConfig, 1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())
cs := newStateWithConfig(log.TestingLogger(), config, state, privVals[0], NewCounterApplication())
assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
newBlockCh := subscribe(t, cs.eventBus, types.EventQueryNewBlock)
@ -57,7 +58,7 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
config.Consensus.CreateEmptyBlocksInterval = ensureTimeout
state, privVals := randGenesisState(baseConfig, 1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())
cs := newStateWithConfig(log.TestingLogger(), config, state, privVals[0], NewCounterApplication())
assertMempool(cs.txNotifier).EnableTxsAvailable()
@ -78,7 +79,7 @@ func TestMempoolProgressInHigherRound(t *testing.T) {
config.Consensus.CreateEmptyBlocks = false
state, privVals := randGenesisState(baseConfig, 1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())
cs := newStateWithConfig(log.TestingLogger(), config, state, privVals[0], NewCounterApplication())
assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
newBlockCh := subscribe(t, cs.eventBus, types.EventQueryNewBlock)
@ -124,11 +125,14 @@ func deliverTxsRange(cs *State, start, end int) {
func TestMempoolTxConcurrentWithCommit(t *testing.T) {
config := configSetup(t)
logger := log.TestingLogger()
state, privVals := randGenesisState(config, 1, false, 10)
stateStore := sm.NewStore(dbm.NewMemDB())
blockStore := store.NewBlockStore(dbm.NewMemDB())
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockStore)
cs := newStateWithConfigAndBlockStore(
logger, config, state, privVals[0], NewCounterApplication(), blockStore)
err := stateStore.Save(state)
require.NoError(t, err)
newBlockHeaderCh := subscribe(t, cs.eventBus, types.EventQueryNewBlockHeader)
@ -155,7 +159,7 @@ func TestMempoolRmBadTx(t *testing.T) {
app := NewCounterApplication()
stateStore := sm.NewStore(dbm.NewMemDB())
blockStore := store.NewBlockStore(dbm.NewMemDB())
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockStore)
cs := newStateWithConfigAndBlockStore(log.TestingLogger(), config, state, privVals[0], app, blockStore)
err := stateStore.Save(state)
require.NoError(t, err)


+ 4
- 7
internal/consensus/reactor_test.go View File

@ -411,18 +411,15 @@ func TestReactorWithEvidence(t *testing.T) {
evpool2 := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs := NewState(logger.With("validator", i, "module", "consensus"),
thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs.SetPrivValidator(pv)
eventBus := eventbus.NewDefault()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
err = eventBus.Start()
require.NoError(t, err)
eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))
require.NoError(t, eventBus.Start())
cs.SetEventBus(eventBus)
cs.SetTimeoutTicker(tickerFunc())
cs.SetLogger(logger.With("validator", i, "module", "consensus"))
states[i] = cs
}


+ 1
- 1
internal/consensus/replay.go View File

@ -421,7 +421,7 @@ func (h *Handshaker) ReplayBlocks(
if err != nil {
return nil, err
}
mockApp := newMockProxyApp(appHash, abciResponses)
mockApp := newMockProxyApp(h.logger, appHash, abciResponses)
h.logger.Info("Replay last block using mock app")
state, err = h.replayBlock(state, storeBlockHeight, mockApp)
return state.AppHash, err


+ 5
- 5
internal/consensus/replay_file.go View File

@ -147,7 +147,7 @@ func (pb *playback) replayReset(count int, newStepSub eventbus.Subscription) err
}
pb.cs.Wait()
newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
newCS := NewState(pb.cs.Logger, pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool)
newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay()
@ -337,14 +337,14 @@ func newConsensusStateForReplay(
}
// Create proxyAppConn connection (consensus, mempool, query)
clientCreator, _ := proxy.DefaultClientCreator(cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
clientCreator, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
err = proxyApp.Start()
if err != nil {
return nil, fmt.Errorf("starting proxy app conns: %w", err)
}
eventBus := eventbus.NewDefault()
eventBus := eventbus.NewDefault(logger)
if err := eventBus.Start(); err != nil {
return nil, fmt.Errorf("failed to start event bus: %w", err)
}
@ -358,7 +358,7 @@ func newConsensusStateForReplay(
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(csConfig, state.Copy(), blockExec,
consensusState := NewState(logger, csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool)
consensusState.SetEventBus(eventBus)


+ 3
- 2
internal/consensus/replay_stubs.go View File

@ -8,6 +8,7 @@ import (
"github.com/tendermint/tendermint/internal/libs/clist"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/proxy"
"github.com/tendermint/tendermint/libs/log"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
"github.com/tendermint/tendermint/types"
)
@ -54,12 +55,12 @@ func (emptyMempool) CloseWAL() {}
// Useful because we don't want to call Commit() twice for the same block on
// the real app.
func newMockProxyApp(appHash []byte, abciResponses *tmstate.ABCIResponses) proxy.AppConnConsensus {
func newMockProxyApp(logger log.Logger, appHash []byte, abciResponses *tmstate.ABCIResponses) proxy.AppConnConsensus {
clientCreator := abciclient.NewLocalCreator(&mockProxyApp{
appHash: appHash,
abciResponses: abciResponses,
})
cli, _ := clientCreator()
cli, _ := clientCreator(logger)
err := cli.Start()
if err != nil {
panic(err)


+ 18
- 15
internal/consensus/replay_test.go View File

@ -64,13 +64,13 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *config.Co
privValidator := loadPrivValidator(consensusReplayConfig)
blockStore := store.NewBlockStore(dbm.NewMemDB())
cs := newStateWithConfigAndBlockStore(
logger,
consensusReplayConfig,
state,
privValidator,
kvstore.NewApplication(),
blockStore,
)
cs.SetLogger(logger)
bytes, _ := os.ReadFile(cs.config.WalFile())
t.Logf("====== WAL: \n\r%X\n", bytes)
@ -164,13 +164,13 @@ LOOP:
require.NoError(t, err)
privValidator := loadPrivValidator(consensusReplayConfig)
cs := newStateWithConfigAndBlockStore(
logger,
consensusReplayConfig,
state,
privValidator,
kvstore.NewApplication(),
blockStore,
)
cs.SetLogger(logger)
// start sending transactions
ctx, cancel := context.WithCancel(context.Background())
@ -639,7 +639,7 @@ func TestMockProxyApp(t *testing.T) {
err = proto.Unmarshal(bytes, loadedAbciRes)
require.NoError(t, err)
mock := newMockProxyApp([]byte("mock_hash"), loadedAbciRes)
mock := newMockProxyApp(logger, []byte("mock_hash"), loadedAbciRes)
abciRes := new(tmstate.ABCIResponses)
abciRes.DeliverTxs = make([]*abci.ResponseDeliverTx, len(loadedAbciRes.DeliverTxs))
@ -696,6 +696,7 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod
cfg := sim.Config
logger := log.TestingLogger()
if testValidatorsChange {
testConfig, err := ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode))
require.NoError(t, err)
@ -719,9 +720,8 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod
privVal, err := privval.LoadFilePV(cfg.PrivValidator.KeyFile(), cfg.PrivValidator.StateFile())
require.NoError(t, err)
wal, err := NewWAL(walFile)
wal, err := NewWAL(logger, walFile)
require.NoError(t, err)
wal.SetLogger(log.TestingLogger())
err = wal.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -742,7 +742,7 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod
state := genesisState.Copy()
// run the chain through state.ApplyBlock to build up the tendermint state
state = buildTMStateFromChain(cfg, sim.Mempool, sim.Evpool, stateStore, state, chain, nBlocks, mode, store)
state = buildTMStateFromChain(cfg, logger, sim.Mempool, sim.Evpool, stateStore, state, chain, nBlocks, mode, store)
latestAppHash := state.AppHash
// make a new client creator
@ -754,7 +754,7 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod
if nBlocks > 0 {
// run nBlocks against a new client to build up the app state.
// use a throwaway tendermint state
proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics())
proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics())
stateDB1 := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB1)
err := stateStore.Save(genesisState)
@ -773,8 +773,8 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod
// now start the app using the handshake - it should sync
genDoc, _ := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
handshaker := NewHandshaker(log.TestingLogger(), stateStore, state, store, eventbus.NopEventBus{}, genDoc)
proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics())
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics())
if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
}
@ -888,6 +888,7 @@ func buildAppStateFromChain(
func buildTMStateFromChain(
cfg *config.Config,
logger log.Logger,
mempool mempool.Mempool,
evpool sm.EvidencePool,
stateStore sm.Store,
@ -895,14 +896,15 @@ func buildTMStateFromChain(
chain []*types.Block,
nBlocks int,
mode uint,
blockStore *mockBlockStore) sm.State {
blockStore *mockBlockStore,
) sm.State {
// run the whole chain against this client to build up the tendermint state
kvstoreApp := kvstore.NewPersistentKVStoreApplication(
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode)))
defer kvstoreApp.Close()
clientCreator := abciclient.NewLocalCreator(kvstoreApp)
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
if err := proxyApp.Start(); err != nil {
panic(err)
}
@ -972,7 +974,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
{
app := &badApp{numBlocks: 3, allHashesAreWrong: true}
clientCreator := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
err := proxyApp.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -996,7 +998,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
{
app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true}
clientCreator := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
err := proxyApp.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -1257,8 +1259,9 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err)
handshaker := NewHandshaker(log.TestingLogger(), stateStore, state, store, eventbus.NopEventBus{}, genDoc)
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
logger := log.TestingLogger()
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
}


+ 4
- 11
internal/consensus/state.go View File

@ -153,6 +153,7 @@ type StateOption func(*State)
// NewState returns a new State.
func NewState(
logger log.Logger,
cfg *config.ConsensusConfig,
state sm.State,
blockExec *sm.BlockExecutor,
@ -168,7 +169,7 @@ func NewState(
txNotifier: txNotifier,
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
timeoutTicker: NewTimeoutTicker(),
timeoutTicker: NewTimeoutTicker(logger),
statsMsgQueue: make(chan msgInfo, msgQueueSize),
done: make(chan struct{}),
doWALCatchup: true,
@ -193,7 +194,7 @@ func NewState(
// NOTE: we do not call scheduleRound0 yet, we do that upon Start()
cs.BaseService = *service.NewBaseService(nil, "State", cs)
cs.BaseService = *service.NewBaseService(logger, "State", cs)
for _, option := range options {
option(cs)
}
@ -201,12 +202,6 @@ func NewState(
return cs
}
// SetLogger implements Service.
func (cs *State) SetLogger(l log.Logger) {
cs.BaseService.Logger = l
cs.timeoutTicker.SetLogger(l)
}
// SetEventBus sets event bus.
func (cs *State) SetEventBus(b *eventbus.EventBus) {
cs.eventBus = b
@ -481,14 +476,12 @@ func (cs *State) Wait() {
// OpenWAL opens a file to log all consensus messages and timeouts for
// deterministic accountability.
func (cs *State) OpenWAL(walFile string) (WAL, error) {
wal, err := NewWAL(walFile)
wal, err := NewWAL(cs.Logger.With("wal", walFile), walFile)
if err != nil {
cs.Logger.Error("failed to open WAL", "file", walFile, "err", err)
return nil, err
}
wal.SetLogger(cs.Logger.With("wal", walFile))
if err := wal.Start(); err != nil {
cs.Logger.Error("failed to start WAL", "err", err)
return nil, err


+ 39
- 36
internal/consensus/state_test.go View File

@ -58,7 +58,7 @@ x * TestHalt1 - if we see +2/3 precommits after timing out into new round, we sh
func TestStateProposerSelection0(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
height, round := cs1.Height, cs1.Round
@ -102,7 +102,7 @@ func TestStateProposerSelection0(t *testing.T) {
func TestStateProposerSelection2(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4) // test needs more work for more than 3 validators
cs1, vss, err := randState(config, log.TestingLogger(), 4) // test needs more work for more than 3 validators
require.NoError(t, err)
height := cs1.Height
@ -143,7 +143,7 @@ func TestStateProposerSelection2(t *testing.T) {
func TestStateEnterProposeNoPrivValidator(t *testing.T) {
config := configSetup(t)
cs, _, err := randState(config, 1)
cs, _, err := randState(config, log.TestingLogger(), 1)
require.NoError(t, err)
cs.SetPrivValidator(nil)
height, round := cs.Height, cs.Round
@ -165,7 +165,7 @@ func TestStateEnterProposeNoPrivValidator(t *testing.T) {
func TestStateEnterProposeYesPrivValidator(t *testing.T) {
config := configSetup(t)
cs, _, err := randState(config, 1)
cs, _, err := randState(config, log.TestingLogger(), 1)
require.NoError(t, err)
height, round := cs.Height, cs.Round
@ -198,7 +198,7 @@ func TestStateEnterProposeYesPrivValidator(t *testing.T) {
func TestStateBadProposal(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 2)
cs1, vss, err := randState(config, log.TestingLogger(), 2)
require.NoError(t, err)
height, round := cs1.Height, cs1.Round
vs2 := vss[1]
@ -259,7 +259,7 @@ func TestStateBadProposal(t *testing.T) {
func TestStateOversizedBlock(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 2)
cs1, vss, err := randState(config, log.TestingLogger(), 2)
require.NoError(t, err)
cs1.state.ConsensusParams.Block.MaxBytes = 2000
height, round := cs1.Height, cs1.Round
@ -323,8 +323,9 @@ func TestStateOversizedBlock(t *testing.T) {
// propose, prevote, and precommit a block
func TestStateFullRound1(t *testing.T) {
config := configSetup(t)
logger := log.TestingLogger()
cs, vss, err := randState(config, 1)
cs, vss, err := randState(config, logger, 1)
require.NoError(t, err)
height, round := cs.Height, cs.Round
@ -333,8 +334,8 @@ func TestStateFullRound1(t *testing.T) {
if err := cs.eventBus.Stop(); err != nil {
t.Error(err)
}
eventBus := eventbus.NewDefault()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
eventBus := eventbus.NewDefault(logger.With("module", "events"))
cs.SetEventBus(eventBus)
if err := eventBus.Start(); err != nil {
t.Error(err)
@ -367,7 +368,7 @@ func TestStateFullRound1(t *testing.T) {
func TestStateFullRoundNil(t *testing.T) {
config := configSetup(t)
cs, vss, err := randState(config, 1)
cs, vss, err := randState(config, log.TestingLogger(), 1)
require.NoError(t, err)
height, round := cs.Height, cs.Round
@ -388,7 +389,7 @@ func TestStateFullRoundNil(t *testing.T) {
func TestStateFullRound2(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 2)
cs1, vss, err := randState(config, log.TestingLogger(), 2)
require.NoError(t, err)
vs2 := vss[1]
height, round := cs1.Height, cs1.Round
@ -431,7 +432,7 @@ func TestStateFullRound2(t *testing.T) {
func TestStateLockNoPOL(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 2)
cs1, vss, err := randState(config, log.TestingLogger(), 2)
require.NoError(t, err)
vs2 := vss[1]
height, round := cs1.Height, cs1.Round
@ -570,7 +571,7 @@ func TestStateLockNoPOL(t *testing.T) {
ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.Precommit(round).Nanoseconds())
cs2, _, err := randState(config, 2) // needed so generated block is different than locked block
cs2, _, err := randState(config, log.TestingLogger(), 2) // needed so generated block is different than locked block
require.NoError(t, err)
// before we time out into new round, set next proposal block
prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1)
@ -622,8 +623,9 @@ func TestStateLockNoPOL(t *testing.T) {
// the others prevote a new block hence v1 changes lock and precommits the new block with the others
func TestStateLockPOLRelock(t *testing.T) {
config := configSetup(t)
logger := log.TestingLogger()
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, logger, 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -668,7 +670,7 @@ func TestStateLockPOLRelock(t *testing.T) {
signAddVotes(config, cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4)
// before we timeout to the new round set the new proposal
cs2, err := newState(cs1.state, vs2, kvstore.NewApplication())
cs2, err := newState(logger, cs1.state, vs2, kvstore.NewApplication())
require.NoError(t, err)
prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1)
@ -725,7 +727,7 @@ func TestStateLockPOLRelock(t *testing.T) {
func TestStateLockPOLUnlock(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -819,8 +821,9 @@ func TestStateLockPOLUnlock(t *testing.T) {
// prevote and now v1 can lock onto the third block and precommit that
func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {
config := configSetup(t)
logger := log.TestingLogger()
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, logger, 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -861,7 +864,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {
signAddVotes(config, cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4)
// before we timeout to the new round set the new proposal
cs2, err := newState(cs1.state, vs2, kvstore.NewApplication())
cs2, err := newState(logger, cs1.state, vs2, kvstore.NewApplication())
require.NoError(t, err)
prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1)
if prop == nil || propBlock == nil {
@ -906,7 +909,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {
signAddVotes(config, cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4)
// before we timeout to the new round set the new proposal
cs3, err := newState(cs1.state, vs3, kvstore.NewApplication())
cs3, err := newState(logger, cs1.state, vs3, kvstore.NewApplication())
require.NoError(t, err)
prop, propBlock = decideProposal(cs3, vs3, vs3.Height, vs3.Round+1)
if prop == nil || propBlock == nil {
@ -951,7 +954,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {
func TestStateLockPOLSafety1(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1075,7 +1078,7 @@ func TestStateLockPOLSafety1(t *testing.T) {
func TestStateLockPOLSafety2(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1175,7 +1178,7 @@ func TestStateLockPOLSafety2(t *testing.T) {
func TestProposeValidBlock(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1268,7 +1271,7 @@ func TestProposeValidBlock(t *testing.T) {
func TestSetValidBlockOnDelayedPrevote(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1333,7 +1336,7 @@ func TestSetValidBlockOnDelayedPrevote(t *testing.T) {
func TestSetValidBlockOnDelayedProposal(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1392,7 +1395,7 @@ func TestSetValidBlockOnDelayedProposal(t *testing.T) {
func TestWaitingTimeoutOnNilPolka(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1416,7 +1419,7 @@ func TestWaitingTimeoutOnNilPolka(t *testing.T) {
func TestWaitingTimeoutProposeOnNewRound(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1455,7 +1458,7 @@ func TestWaitingTimeoutProposeOnNewRound(t *testing.T) {
func TestRoundSkipOnNilPolkaFromHigherRound(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1494,7 +1497,7 @@ func TestRoundSkipOnNilPolkaFromHigherRound(t *testing.T) {
func TestWaitTimeoutProposeOnNilPolkaForTheCurrentRound(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, int32(1)
@ -1524,7 +1527,7 @@ func TestWaitTimeoutProposeOnNilPolkaForTheCurrentRound(t *testing.T) {
func TestEmitNewValidBlockEventOnCommitWithoutBlock(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, int32(1)
@ -1561,7 +1564,7 @@ func TestEmitNewValidBlockEventOnCommitWithoutBlock(t *testing.T) {
func TestCommitFromPreviousRound(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, int32(1)
@ -1618,7 +1621,7 @@ func TestStartNextHeightCorrectlyAfterTimeout(t *testing.T) {
config := configSetup(t)
config.Consensus.SkipTimeoutCommit = false
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
cs1.txNotifier = &fakeTxNotifier{ch: make(chan struct{})}
@ -1682,7 +1685,7 @@ func TestResetTimeoutPrecommitUponNewHeight(t *testing.T) {
config := configSetup(t)
config.Consensus.SkipTimeoutCommit = false
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
@ -1826,7 +1829,7 @@ func TestStateSlashingPrecommits(t *testing.T) {
func TestStateHalt1(t *testing.T) {
config := configSetup(t)
cs1, vss, err := randState(config, 4)
cs1, vss, err := randState(config, log.TestingLogger(), 4)
require.NoError(t, err)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1897,7 +1900,7 @@ func TestStateOutputsBlockPartsStats(t *testing.T) {
config := configSetup(t)
// create dummy peer
cs, _, err := randState(config, 1)
cs, _, err := randState(config, log.TestingLogger(), 1)
require.NoError(t, err)
peerID, err := types.NewNodeID("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
require.NoError(t, err)
@ -1943,7 +1946,7 @@ func TestStateOutputsBlockPartsStats(t *testing.T) {
func TestStateOutputVoteStats(t *testing.T) {
config := configSetup(t)
cs, vss, err := randState(config, 2)
cs, vss, err := randState(config, log.TestingLogger(), 2)
require.NoError(t, err)
// create dummy peer
peerID, err := types.NewNodeID("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
@ -1980,7 +1983,7 @@ func TestStateOutputVoteStats(t *testing.T) {
func TestSignSameVoteTwice(t *testing.T) {
config := configSetup(t)
_, vss, err := randState(config, 2)
_, vss, err := randState(config, log.TestingLogger(), 2)
require.NoError(t, err)
randBytes := tmrand.Bytes(tmhash.Size)


+ 2
- 4
internal/consensus/ticker.go View File

@ -19,8 +19,6 @@ type TimeoutTicker interface {
Stop() error
Chan() <-chan timeoutInfo // on which to receive a timeout
ScheduleTimeout(ti timeoutInfo) // reset the timer
SetLogger(log.Logger)
}
// timeoutTicker wraps time.Timer,
@ -37,13 +35,13 @@ type timeoutTicker struct {
}
// NewTimeoutTicker returns a new TimeoutTicker.
func NewTimeoutTicker() TimeoutTicker {
func NewTimeoutTicker(logger log.Logger) TimeoutTicker {
tt := &timeoutTicker{
timer: time.NewTimer(0),
tickChan: make(chan timeoutInfo, tickTockBufferSize),
tockChan: make(chan timeoutInfo, tickTockBufferSize),
}
tt.BaseService = *service.NewBaseService(nil, "TimeoutTicker", tt)
tt.BaseService = *service.NewBaseService(logger, "TimeoutTicker", tt)
tt.stopTimer() // don't want to fire until the first scheduled timeout
return tt
}


+ 2
- 7
internal/consensus/wal.go View File

@ -88,7 +88,7 @@ var _ WAL = &BaseWAL{}
// NewWAL returns a new write-ahead logger based on `baseWAL`, which implements
// WAL. It's flushed and synced to disk every 2s and once when stopped.
func NewWAL(walFile string, groupOptions ...func(*auto.Group)) (*BaseWAL, error) {
func NewWAL(logger log.Logger, walFile string, groupOptions ...func(*auto.Group)) (*BaseWAL, error) {
err := tmos.EnsureDir(filepath.Dir(walFile), 0700)
if err != nil {
return nil, fmt.Errorf("failed to ensure WAL directory is in place: %w", err)
@ -103,7 +103,7 @@ func NewWAL(walFile string, groupOptions ...func(*auto.Group)) (*BaseWAL, error)
enc: NewWALEncoder(group),
flushInterval: walDefaultFlushInterval,
}
wal.BaseService = *service.NewBaseService(nil, "baseWAL", wal)
wal.BaseService = *service.NewBaseService(logger, "baseWAL", wal)
return wal, nil
}
@ -116,11 +116,6 @@ func (wal *BaseWAL) Group() *auto.Group {
return wal.group
}
func (wal *BaseWAL) SetLogger(l log.Logger) {
wal.BaseService.Logger = l
wal.group.SetLogger(l)
}
func (wal *BaseWAL) OnStart() error {
size, err := wal.group.Head.Size()
if err != nil {


+ 3
- 6
internal/consensus/wal_generator.go View File

@ -66,8 +66,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
blockStore := store.NewBlockStore(blockStoreDB)
proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), proxy.NopMetrics())
proxyApp.SetLogger(logger.With("module", "proxy"))
proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics())
if err := proxyApp.Start(); err != nil {
return fmt.Errorf("failed to start proxy app connections: %w", err)
}
@ -77,8 +76,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
}
})
eventBus := eventbus.NewDefault()
eventBus.SetLogger(logger.With("module", "events"))
eventBus := eventbus.NewDefault(logger.With("module", "events"))
if err := eventBus.Start(); err != nil {
return fmt.Errorf("failed to start event bus: %w", err)
}
@ -90,8 +88,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetLogger(logger)
consensusState := NewState(logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetEventBus(eventBus)
if privValidator != nil && privValidator != (*privval.FilePV)(nil) {
consensusState.SetPrivValidator(privValidator)


+ 5
- 7
internal/consensus/wal_test.go View File

@ -25,17 +25,17 @@ const (
func TestWALTruncate(t *testing.T) {
walDir := t.TempDir()
walFile := filepath.Join(walDir, "wal")
logger := log.TestingLogger()
// this magic number 4K can truncate the content when RotateFile.
// defaultHeadSizeLimit(10M) is hard to simulate.
// this magic number 1 * time.Millisecond make RotateFile check frequently.
// defaultGroupCheckDuration(5s) is hard to simulate.
wal, err := NewWAL(walFile,
wal, err := NewWAL(logger, walFile,
autofile.GroupHeadSizeLimit(4096),
autofile.GroupCheckDuration(1*time.Millisecond),
)
require.NoError(t, err)
wal.SetLogger(log.TestingLogger())
err = wal.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -105,7 +105,7 @@ func TestWALWrite(t *testing.T) {
walDir := t.TempDir()
walFile := filepath.Join(walDir, "wal")
wal, err := NewWAL(walFile)
wal, err := NewWAL(log.TestingLogger(), walFile)
require.NoError(t, err)
err = wal.Start()
require.NoError(t, err)
@ -148,9 +148,8 @@ func TestWALSearchForEndHeight(t *testing.T) {
}
walFile := tempWALWithData(walBody)
wal, err := NewWAL(walFile)
wal, err := NewWAL(log.TestingLogger(), walFile)
require.NoError(t, err)
wal.SetLogger(log.TestingLogger())
h := int64(3)
gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{})
@ -170,12 +169,11 @@ func TestWALSearchForEndHeight(t *testing.T) {
func TestWALPeriodicSync(t *testing.T) {
walDir := t.TempDir()
walFile := filepath.Join(walDir, "wal")
wal, err := NewWAL(walFile, autofile.GroupCheckDuration(1*time.Millisecond))
wal, err := NewWAL(log.TestingLogger(), walFile, autofile.GroupCheckDuration(1*time.Millisecond))
require.NoError(t, err)
wal.SetFlushInterval(walTestFlushInterval)
wal.SetLogger(log.TestingLogger())
// Generate some data
err = WALGenerateNBlocks(t, wal.Group(), 5)


+ 7
- 8
internal/eventbus/event_bus.go View File

@ -27,18 +27,17 @@ type EventBus struct {
}
// NewDefault returns a new event bus with default options.
func NewDefault() *EventBus {
pubsub := tmpubsub.NewServer(tmpubsub.BufferCapacity(0))
func NewDefault(l log.Logger) *EventBus {
logger := l.With("module", "eventbus")
pubsub := tmpubsub.NewServer(tmpubsub.BufferCapacity(0),
func(s *tmpubsub.Server) {
s.Logger = logger
})
b := &EventBus{pubsub: pubsub}
b.BaseService = *service.NewBaseService(nil, "EventBus", b)
b.BaseService = *service.NewBaseService(logger, "EventBus", b)
return b
}
func (b *EventBus) SetLogger(l log.Logger) {
b.BaseService.SetLogger(l)
b.pubsub.SetLogger(l.With("module", "pubsub"))
}
func (b *EventBus) OnStart() error {
return b.pubsub.Start()
}


+ 8
- 7
internal/eventbus/event_bus_test.go View File

@ -12,13 +12,14 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/types"
)
func TestEventBusPublishEventTx(t *testing.T) {
eventBus := eventbus.NewDefault()
eventBus := eventbus.NewDefault(log.TestingLogger())
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -75,7 +76,7 @@ func TestEventBusPublishEventTx(t *testing.T) {
}
func TestEventBusPublishEventNewBlock(t *testing.T) {
eventBus := eventbus.NewDefault()
eventBus := eventbus.NewDefault(log.TestingLogger())
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -135,7 +136,7 @@ func TestEventBusPublishEventNewBlock(t *testing.T) {
}
func TestEventBusPublishEventTxDuplicateKeys(t *testing.T) {
eventBus := eventbus.NewDefault()
eventBus := eventbus.NewDefault(log.TestingLogger())
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -242,7 +243,7 @@ func TestEventBusPublishEventTxDuplicateKeys(t *testing.T) {
}
func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
eventBus := eventbus.NewDefault()
eventBus := eventbus.NewDefault(log.TestingLogger())
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -299,7 +300,7 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
}
func TestEventBusPublishEventNewEvidence(t *testing.T) {
eventBus := eventbus.NewDefault()
eventBus := eventbus.NewDefault(log.TestingLogger())
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -343,7 +344,7 @@ func TestEventBusPublishEventNewEvidence(t *testing.T) {
}
func TestEventBusPublish(t *testing.T) {
eventBus := eventbus.NewDefault()
eventBus := eventbus.NewDefault(log.TestingLogger())
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -433,7 +434,7 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes
// for random* functions
mrand.Seed(time.Now().Unix())
eventBus := eventbus.NewDefault() // set buffer capacity to 0 so we are not testing cache
eventBus := eventbus.NewDefault(log.TestingLogger()) // set buffer capacity to 0 so we are not testing cache
err := eventBus.Start()
if err != nil {
b.Error(err)


+ 11
- 13
internal/inspect/inspect.go View File

@ -44,20 +44,18 @@ type Inspector struct {
///
//nolint:lll
func New(cfg *config.RPCConfig, bs state.BlockStore, ss state.Store, es []indexer.EventSink, logger log.Logger) *Inspector {
routes := rpc.Routes(*cfg, ss, bs, es, logger)
eb := eventbus.NewDefault()
eb.SetLogger(logger.With("module", "events"))
is := indexer.NewService(indexer.ServiceArgs{
Sinks: es,
EventBus: eb,
Logger: logger.With("module", "txindex"),
})
eb := eventbus.NewDefault(logger.With("module", "events"))
return &Inspector{
routes: routes,
config: cfg,
logger: logger,
eventBus: eb,
indexerService: is,
routes: rpc.Routes(*cfg, ss, bs, es, logger),
config: cfg,
logger: logger,
eventBus: eb,
indexerService: indexer.NewService(indexer.ServiceArgs{
Sinks: es,
EventBus: eb,
Logger: logger.With("module", "txindex"),
}),
}
}


+ 3
- 3
internal/mempool/mempool_test.go View File

@ -77,12 +77,12 @@ func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool {
app := &application{kvstore.NewApplication()}
cc := abciclient.NewLocalCreator(app)
logger := log.TestingLogger()
cfg, err := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|"))
require.NoError(t, err)
cfg.Mempool.CacheSize = cacheSize
appConnMem, err := cc()
appConnMem, err := cc(logger)
require.NoError(t, err)
require.NoError(t, appConnMem.Start())
@ -91,7 +91,7 @@ func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool {
require.NoError(t, appConnMem.Stop())
})
return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...)
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...)
}
func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {


+ 6
- 13
internal/p2p/conn/connection.go View File

@ -146,12 +146,14 @@ func DefaultMConnConfig() MConnConfig {
// NewMConnection wraps net.Conn and creates multiplex connection
func NewMConnection(
logger log.Logger,
conn net.Conn,
chDescs []*ChannelDescriptor,
onReceive receiveCbFunc,
onError errorCbFunc,
) *MConnection {
return NewMConnectionWithConfig(
logger,
conn,
chDescs,
onReceive,
@ -161,6 +163,7 @@ func NewMConnection(
// NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
func NewMConnectionWithConfig(
logger log.Logger,
conn net.Conn,
chDescs []*ChannelDescriptor,
onReceive receiveCbFunc,
@ -185,6 +188,8 @@ func NewMConnectionWithConfig(
created: time.Now(),
}
mconn.BaseService = *service.NewBaseService(logger, "MConnection", mconn)
// Create channels
var channelsIdx = map[ChannelID]*channel{}
var channels = []*channel{}
@ -197,21 +202,12 @@ func NewMConnectionWithConfig(
mconn.channels = channels
mconn.channelsIdx = channelsIdx
mconn.BaseService = *service.NewBaseService(nil, "MConnection", mconn)
// maxPacketMsgSize() is a bit heavy, so call just once
mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
return mconn
}
func (c *MConnection) SetLogger(l log.Logger) {
c.BaseService.SetLogger(l)
for _, ch := range c.channels {
ch.SetLogger(l)
}
}
// OnStart implements BaseService
func (c *MConnection) OnStart() error {
if err := c.BaseService.OnStart(); err != nil {
@ -670,13 +666,10 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *channel {
sendQueue: make(chan []byte, desc.SendQueueCapacity),
recving: make([]byte, 0, desc.RecvBufferCapacity),
maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
Logger: conn.Logger,
}
}
func (ch *channel) SetLogger(l log.Logger) {
ch.Logger = l
}
// Queues message to send to this channel.
// Goroutine-safe
// Times out (and returns false) after defaultSendTimeout


+ 25
- 25
internal/p2p/conn/connection_test.go View File

@ -19,17 +19,18 @@ import (
const maxPingPongPacketSize = 1024 // bytes
func createTestMConnection(conn net.Conn) *MConnection {
onReceive := func(chID ChannelID, msgBytes []byte) {
}
onError := func(r interface{}) {
}
c := createMConnectionWithCallbacks(conn, onReceive, onError)
c.SetLogger(log.TestingLogger())
return c
func createTestMConnection(logger log.Logger, conn net.Conn) *MConnection {
return createMConnectionWithCallbacks(logger, conn,
// onRecieve
func(chID ChannelID, msgBytes []byte) {
},
// onError
func(r interface{}) {
})
}
func createMConnectionWithCallbacks(
logger log.Logger,
conn net.Conn,
onReceive func(chID ChannelID, msgBytes []byte),
onError func(r interface{}),
@ -38,8 +39,7 @@ func createMConnectionWithCallbacks(
cfg.PingInterval = 90 * time.Millisecond
cfg.PongTimeout = 45 * time.Millisecond
chDescs := []*ChannelDescriptor{{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg)
c.SetLogger(log.TestingLogger())
c := NewMConnectionWithConfig(logger, conn, chDescs, onReceive, onError, cfg)
return c
}
@ -47,7 +47,7 @@ func TestMConnectionSendFlushStop(t *testing.T) {
server, client := NetPipe()
t.Cleanup(closeAll(t, client, server))
clientConn := createTestMConnection(client)
clientConn := createTestMConnection(log.TestingLogger(), client)
err := clientConn.Start()
require.Nil(t, err)
t.Cleanup(stopAll(t, clientConn))
@ -81,7 +81,7 @@ func TestMConnectionSend(t *testing.T) {
server, client := NetPipe()
t.Cleanup(closeAll(t, client, server))
mconn := createTestMConnection(client)
mconn := createTestMConnection(log.TestingLogger(), client)
err := mconn.Start()
require.Nil(t, err)
t.Cleanup(stopAll(t, mconn))
@ -117,12 +117,13 @@ func TestMConnectionReceive(t *testing.T) {
onError := func(r interface{}) {
errorsCh <- r
}
mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
logger := log.TestingLogger()
mconn1 := createMConnectionWithCallbacks(logger, client, onReceive, onError)
err := mconn1.Start()
require.Nil(t, err)
t.Cleanup(stopAll(t, mconn1))
mconn2 := createTestMConnection(server)
mconn2 := createTestMConnection(logger, server)
err = mconn2.Start()
require.Nil(t, err)
t.Cleanup(stopAll(t, mconn2))
@ -152,7 +153,7 @@ func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
onError := func(r interface{}) {
errorsCh <- r
}
mconn := createMConnectionWithCallbacks(client, onReceive, onError)
mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, onReceive, onError)
err := mconn.Start()
require.Nil(t, err)
t.Cleanup(stopAll(t, mconn))
@ -190,7 +191,7 @@ func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) {
onError := func(r interface{}) {
errorsCh <- r
}
mconn := createMConnectionWithCallbacks(client, onReceive, onError)
mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, onReceive, onError)
err := mconn.Start()
require.Nil(t, err)
t.Cleanup(stopAll(t, mconn))
@ -244,7 +245,7 @@ func TestMConnectionMultiplePings(t *testing.T) {
onError := func(r interface{}) {
errorsCh <- r
}
mconn := createMConnectionWithCallbacks(client, onReceive, onError)
mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, onReceive, onError)
err := mconn.Start()
require.Nil(t, err)
t.Cleanup(stopAll(t, mconn))
@ -291,7 +292,7 @@ func TestMConnectionPingPongs(t *testing.T) {
onError := func(r interface{}) {
errorsCh <- r
}
mconn := createMConnectionWithCallbacks(client, onReceive, onError)
mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, onReceive, onError)
err := mconn.Start()
require.Nil(t, err)
t.Cleanup(stopAll(t, mconn))
@ -348,7 +349,7 @@ func TestMConnectionStopsAndReturnsError(t *testing.T) {
onError := func(r interface{}) {
errorsCh <- r
}
mconn := createMConnectionWithCallbacks(client, onReceive, onError)
mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, onReceive, onError)
err := mconn.Start()
require.Nil(t, err)
t.Cleanup(stopAll(t, mconn))
@ -379,19 +380,18 @@ func newClientAndServerConnsForReadErrors(t *testing.T, chOnErr chan struct{}) (
{ID: 0x01, Priority: 1, SendQueueCapacity: 1},
{ID: 0x02, Priority: 1, SendQueueCapacity: 1},
}
mconnClient := NewMConnection(client, chDescs, onReceive, onError)
mconnClient.SetLogger(log.TestingLogger().With("module", "client"))
logger := log.TestingLogger()
mconnClient := NewMConnection(logger.With("module", "client"), client, chDescs, onReceive, onError)
err := mconnClient.Start()
require.Nil(t, err)
// create server conn with 1 channel
// it fires on chOnErr when there's an error
serverLogger := log.TestingLogger().With("module", "server")
serverLogger := logger.With("module", "server")
onError = func(r interface{}) {
chOnErr <- struct{}{}
}
mconnServer := createMConnectionWithCallbacks(server, onReceive, onError)
mconnServer.SetLogger(serverLogger)
mconnServer := createMConnectionWithCallbacks(serverLogger, server, onReceive, onError)
err = mconnServer.Start()
require.Nil(t, err)
return mconnClient, mconnServer
@ -488,7 +488,7 @@ func TestMConnectionTrySend(t *testing.T) {
server, client := NetPipe()
t.Cleanup(closeAll(t, client, server))
mconn := createTestMConnection(client)
mconn := createTestMConnection(log.TestingLogger(), client)
err := mconn.Start()
require.Nil(t, err)
t.Cleanup(stopAll(t, mconn))


+ 1
- 1
internal/p2p/transport_mconn.go View File

@ -336,13 +336,13 @@ func (c *mConnConnection) handshake(
}
mconn := conn.NewMConnectionWithConfig(
c.logger.With("peer", c.RemoteEndpoint().NodeAddress(peerInfo.NodeID)),
secretConn,
c.channelDescs,
c.onReceive,
c.onError,
c.mConnConfig,
)
mconn.SetLogger(c.logger.With("peer", c.RemoteEndpoint().NodeAddress(peerInfo.NodeID)))
return mconn, peerInfo, secretConn.RemotePubKey(), nil
}


+ 3
- 2
internal/p2p/trust/store.go View File

@ -11,6 +11,7 @@ import (
dbm "github.com/tendermint/tm-db"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
)
@ -38,14 +39,14 @@ type MetricStore struct {
// NewTrustMetricStore returns a store that saves data to the DB
// and uses the config when creating new trust metrics.
// Use Start to to initialize the trust metric store
func NewTrustMetricStore(db dbm.DB, tmc MetricConfig) *MetricStore {
func NewTrustMetricStore(db dbm.DB, tmc MetricConfig, logger log.Logger) *MetricStore {
tms := &MetricStore{
peerMetrics: make(map[string]*Metric),
db: db,
config: tmc,
}
tms.BaseService = *service.NewBaseService(nil, "MetricStore", tms)
tms.BaseService = *service.NewBaseService(logger, "MetricStore", tms)
return tms
}


+ 12
- 12
internal/p2p/trust/store_test.go View File

@ -16,17 +16,16 @@ import (
func TestTrustMetricStoreSaveLoad(t *testing.T) {
dir := t.TempDir()
logger := log.TestingLogger()
historyDB, err := dbm.NewDB("trusthistory", "goleveldb", dir)
require.NoError(t, err)
// 0 peers saved
store := NewTrustMetricStore(historyDB, DefaultConfig())
store.SetLogger(log.TestingLogger())
store := NewTrustMetricStore(historyDB, DefaultConfig(), logger)
store.saveToDB()
// Load the data from the file
store = NewTrustMetricStore(historyDB, DefaultConfig())
store.SetLogger(log.TestingLogger())
store = NewTrustMetricStore(historyDB, DefaultConfig(), logger)
err = store.Start()
require.NoError(t, err)
// Make sure we still have 0 entries
@ -64,8 +63,8 @@ func TestTrustMetricStoreSaveLoad(t *testing.T) {
require.NoError(t, err)
// Load the data from the DB
store = NewTrustMetricStore(historyDB, DefaultConfig())
store.SetLogger(log.TestingLogger())
store = NewTrustMetricStore(historyDB, DefaultConfig(), logger)
err = store.Start()
require.NoError(t, err)
@ -88,9 +87,10 @@ func TestTrustMetricStoreConfig(t *testing.T) {
IntegralWeight: 0.5,
}
logger := log.TestingLogger()
// Create a store with custom config
store := NewTrustMetricStore(historyDB, config)
store.SetLogger(log.TestingLogger())
store := NewTrustMetricStore(historyDB, config, logger)
err = store.Start()
require.NoError(t, err)
@ -108,8 +108,8 @@ func TestTrustMetricStoreLookup(t *testing.T) {
historyDB, err := dbm.NewDB("", "memdb", "")
require.NoError(t, err)
store := NewTrustMetricStore(historyDB, DefaultConfig())
store.SetLogger(log.TestingLogger())
store := NewTrustMetricStore(historyDB, DefaultConfig(), log.TestingLogger())
err = store.Start()
require.NoError(t, err)
@ -131,8 +131,8 @@ func TestTrustMetricStorePeerScore(t *testing.T) {
historyDB, err := dbm.NewDB("", "memdb", "")
require.NoError(t, err)
store := NewTrustMetricStore(historyDB, DefaultConfig())
store.SetLogger(log.TestingLogger())
store := NewTrustMetricStore(historyDB, DefaultConfig(), log.TestingLogger())
err = store.Start()
require.NoError(t, err)


+ 15
- 15
internal/proxy/app_conn_test.go View File

@ -48,11 +48,11 @@ var SOCKET = "socket"
func TestEcho(t *testing.T) {
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6))
clientCreator := abciclient.NewRemoteCreator(sockPath, SOCKET, true)
logger := log.TestingLogger()
clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true)
// Start server
s := server.NewSocketServer(sockPath, kvstore.NewApplication())
s.SetLogger(log.TestingLogger().With("module", "abci-server"))
s := server.NewSocketServer(logger.With("module", "abci-server"), sockPath, kvstore.NewApplication())
if err := s.Start(); err != nil {
t.Fatalf("Error starting socket server: %v", err.Error())
}
@ -63,11 +63,11 @@ func TestEcho(t *testing.T) {
})
// Start client
cli, err := clientCreator()
cli, err := clientCreator(logger.With("module", "abci-client"))
if err != nil {
t.Fatalf("Error creating ABCI client: %v", err.Error())
}
cli.SetLogger(log.TestingLogger().With("module", "abci-client"))
if err := cli.Start(); err != nil {
t.Fatalf("Error starting ABCI client: %v", err.Error())
}
@ -96,11 +96,11 @@ func TestEcho(t *testing.T) {
func BenchmarkEcho(b *testing.B) {
b.StopTimer() // Initialize
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6))
clientCreator := abciclient.NewRemoteCreator(sockPath, SOCKET, true)
logger := log.TestingLogger()
clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true)
// Start server
s := server.NewSocketServer(sockPath, kvstore.NewApplication())
s.SetLogger(log.TestingLogger().With("module", "abci-server"))
s := server.NewSocketServer(logger.With("module", "abci-server"), sockPath, kvstore.NewApplication())
if err := s.Start(); err != nil {
b.Fatalf("Error starting socket server: %v", err.Error())
}
@ -111,11 +111,11 @@ func BenchmarkEcho(b *testing.B) {
})
// Start client
cli, err := clientCreator()
cli, err := clientCreator(logger.With("module", "abci-client"))
if err != nil {
b.Fatalf("Error creating ABCI client: %v", err.Error())
}
cli.SetLogger(log.TestingLogger().With("module", "abci-client"))
if err := cli.Start(); err != nil {
b.Fatalf("Error starting ABCI client: %v", err.Error())
}
@ -149,11 +149,11 @@ func BenchmarkEcho(b *testing.B) {
func TestInfo(t *testing.T) {
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6))
clientCreator := abciclient.NewRemoteCreator(sockPath, SOCKET, true)
logger := log.TestingLogger()
clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true)
// Start server
s := server.NewSocketServer(sockPath, kvstore.NewApplication())
s.SetLogger(log.TestingLogger().With("module", "abci-server"))
s := server.NewSocketServer(logger.With("module", "abci-server"), sockPath, kvstore.NewApplication())
if err := s.Start(); err != nil {
t.Fatalf("Error starting socket server: %v", err.Error())
}
@ -164,11 +164,11 @@ func TestInfo(t *testing.T) {
})
// Start client
cli, err := clientCreator()
cli, err := clientCreator(logger.With("module", "abci-client"))
if err != nil {
t.Fatalf("Error creating ABCI client: %v", err.Error())
}
cli.SetLogger(log.TestingLogger().With("module", "abci-client"))
if err := cli.Start(); err != nil {
t.Fatalf("Error starting ABCI client: %v", err.Error())
}


+ 3
- 2
internal/proxy/client.go View File

@ -6,6 +6,7 @@ import (
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
e2e "github.com/tendermint/tendermint/test/e2e/app"
)
@ -15,7 +16,7 @@ import (
//
// The Closer is a noop except for persistent_kvstore applications,
// which will clean up the store.
func DefaultClientCreator(addr, transport, dbDir string) (abciclient.Creator, io.Closer) {
func DefaultClientCreator(logger log.Logger, addr, transport, dbDir string) (abciclient.Creator, io.Closer) {
switch addr {
case "kvstore":
return abciclient.NewLocalCreator(kvstore.NewApplication()), noopCloser{}
@ -32,7 +33,7 @@ func DefaultClientCreator(addr, transport, dbDir string) (abciclient.Creator, io
return abciclient.NewLocalCreator(types.NewBaseApplication()), noopCloser{}
default:
mustConnect := false // loop retrying
return abciclient.NewRemoteCreator(addr, transport, mustConnect), noopCloser{}
return abciclient.NewRemoteCreator(logger, addr, transport, mustConnect), noopCloser{}
}
}


+ 9
- 8
internal/proxy/multi_app_conn.go View File

@ -6,7 +6,7 @@ import (
"syscall"
abciclient "github.com/tendermint/tendermint/abci/client"
tmlog "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
)
@ -33,8 +33,8 @@ type AppConns interface {
}
// NewAppConns calls NewMultiAppConn.
func NewAppConns(clientCreator abciclient.Creator, metrics *Metrics) AppConns {
return NewMultiAppConn(clientCreator, metrics)
func NewAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns {
return NewMultiAppConn(clientCreator, logger, metrics)
}
// multiAppConn implements AppConns.
@ -60,12 +60,12 @@ type multiAppConn struct {
}
// NewMultiAppConn makes all necessary abci connections to the application.
func NewMultiAppConn(clientCreator abciclient.Creator, metrics *Metrics) AppConns {
func NewMultiAppConn(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns {
multiAppConn := &multiAppConn{
metrics: metrics,
clientCreator: clientCreator,
}
multiAppConn.BaseService = *service.NewBaseService(nil, "multiAppConn", multiAppConn)
multiAppConn.BaseService = *service.NewBaseService(logger, "multiAppConn", multiAppConn)
return multiAppConn
}
@ -128,7 +128,7 @@ func (app *multiAppConn) OnStop() {
}
func (app *multiAppConn) killTMOnClientError() {
killFn := func(conn string, err error, logger tmlog.Logger) {
killFn := func(conn string, err error, logger log.Logger) {
logger.Error(
fmt.Sprintf("%s connection terminated. Did the application crash? Please restart tendermint", conn),
"err", err)
@ -181,11 +181,12 @@ func (app *multiAppConn) stopAllClients() {
}
func (app *multiAppConn) abciClientFor(conn string) (abciclient.Client, error) {
c, err := app.clientCreator()
c, err := app.clientCreator(app.Logger.With(
"module", "abci-client",
"connection", conn))
if err != nil {
return nil, fmt.Errorf("error creating ABCI client (%s connection): %w", conn, err)
}
c.SetLogger(app.Logger.With("module", "abci-client", "connection", conn))
if err := c.Start(); err != nil {
return nil, fmt.Errorf("error starting ABCI client (%s connection): %w", conn, err)
}


+ 5
- 5
internal/proxy/multi_app_conn_test.go View File

@ -14,24 +14,24 @@ import (
abciclient "github.com/tendermint/tendermint/abci/client"
abcimocks "github.com/tendermint/tendermint/abci/client/mocks"
"github.com/tendermint/tendermint/libs/log"
)
func TestAppConns_Start_Stop(t *testing.T) {
quitCh := make(<-chan struct{})
clientMock := &abcimocks.Client{}
clientMock.On("SetLogger", mock.Anything).Return().Times(4)
clientMock.On("Start").Return(nil).Times(4)
clientMock.On("Stop").Return(nil).Times(4)
clientMock.On("Quit").Return(quitCh).Times(4)
creatorCallCount := 0
creator := func() (abciclient.Client, error) {
creator := func(logger log.Logger) (abciclient.Client, error) {
creatorCallCount++
return clientMock, nil
}
appConns := NewAppConns(creator, NopMetrics())
appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics())
err := appConns.Start()
require.NoError(t, err)
@ -68,11 +68,11 @@ func TestAppConns_Failure(t *testing.T) {
clientMock.On("Quit").Return(recvQuitCh)
clientMock.On("Error").Return(errors.New("EOF")).Once()
creator := func() (abciclient.Client, error) {
creator := func(log.Logger) (abciclient.Client, error) {
return clientMock, nil
}
appConns := NewAppConns(creator, NopMetrics())
appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics())
err := appConns.Start()
require.NoError(t, err)


+ 11
- 8
internal/state/execution_test.go View File

@ -38,7 +38,8 @@ var (
func TestApplyBlock(t *testing.T) {
app := &testApp{}
cc := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics())
logger := log.TestingLogger()
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
err := proxyApp.Start()
require.Nil(t, err)
defer proxyApp.Stop() //nolint:errcheck // ignore for tests
@ -46,7 +47,7 @@ func TestApplyBlock(t *testing.T) {
state, stateDB, _ := makeState(1, 1)
stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(),
mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore)
block := sf.MakeBlock(state, 1, new(types.Commit))
@ -63,7 +64,7 @@ func TestApplyBlock(t *testing.T) {
func TestBeginBlockValidators(t *testing.T) {
app := &testApp{}
cc := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics())
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
err := proxyApp.Start()
require.Nil(t, err)
defer proxyApp.Stop() //nolint:errcheck // no need to check error again
@ -126,7 +127,7 @@ func TestBeginBlockValidators(t *testing.T) {
func TestBeginBlockByzantineValidators(t *testing.T) {
app := &testApp{}
cc := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics())
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
err := proxyApp.Start()
require.Nil(t, err)
defer proxyApp.Stop() //nolint:errcheck // ignore for tests
@ -351,7 +352,8 @@ func TestUpdateValidators(t *testing.T) {
func TestEndBlockValidatorUpdates(t *testing.T) {
app := &testApp{}
cc := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics())
logger := log.TestingLogger()
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
err := proxyApp.Start()
require.Nil(t, err)
defer proxyApp.Stop() //nolint:errcheck // ignore for tests
@ -362,14 +364,14 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
blockExec := sm.NewBlockExecutor(
stateStore,
log.TestingLogger(),
logger,
proxyApp.Consensus(),
mmock.Mempool{},
sm.EmptyEvidencePool{},
blockStore,
)
eventBus := eventbus.NewDefault()
eventBus := eventbus.NewDefault(logger)
err = eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop() //nolint:errcheck // ignore for tests
@ -420,7 +422,8 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
app := &testApp{}
cc := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics())
logger := log.TestingLogger()
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
err := proxyApp.Start()
require.Nil(t, err)
defer proxyApp.Stop() //nolint:errcheck // ignore for tests


+ 2
- 1
internal/state/helpers_test.go View File

@ -16,6 +16,7 @@ import (
sm "github.com/tendermint/tendermint/internal/state"
sf "github.com/tendermint/tendermint/internal/state/test/factory"
"github.com/tendermint/tendermint/internal/test/factory"
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
tmtime "github.com/tendermint/tendermint/libs/time"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
@ -31,7 +32,7 @@ type paramsChangeTestCase struct {
func newTestApp() proxy.AppConns {
app := &testApp{}
cc := abciclient.NewLocalCreator(app)
return proxy.NewAppConns(cc, proxy.NopMetrics())
return proxy.NewAppConns(cc, log.NewNopLogger(), proxy.NopMetrics())
}
func makeAndCommitGoodBlock(


+ 0
- 9
internal/state/indexer/indexer_service.go View File

@ -41,15 +41,6 @@ func NewService(args ServiceArgs) *Service {
return is
}
// NewIndexerService returns a new service instance.
// Deprecated: Use NewService instead.
func NewIndexerService(es []EventSink, eventBus *eventbus.EventBus) *Service {
return NewService(ServiceArgs{
Sinks: es,
EventBus: eventBus,
})
}
// publish publishes a pubsub message to the service. The service blocks until
// the message has been fully processed.
func (is *Service) publish(msg pubsub.Message) error {


+ 11
- 4
internal/state/indexer/indexer_service_test.go View File

@ -38,10 +38,18 @@ var (
dbName = "postgres"
)
// NewIndexerService returns a new service instance.
func NewIndexerService(es []indexer.EventSink, eventBus *eventbus.EventBus) *indexer.Service {
return indexer.NewService(indexer.ServiceArgs{
Sinks: es,
EventBus: eventBus,
})
}
func TestIndexerServiceIndexesBlocks(t *testing.T) {
logger := tmlog.TestingLogger()
// event bus
eventBus := eventbus.NewDefault()
eventBus.SetLogger(tmlog.TestingLogger())
eventBus := eventbus.NewDefault(logger)
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -62,8 +70,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
assert.True(t, indexer.KVSinkEnabled(eventSinks))
assert.True(t, indexer.IndexingEnabled(eventSinks))
service := indexer.NewIndexerService(eventSinks, eventBus)
service.SetLogger(tmlog.TestingLogger())
service := NewIndexerService(eventSinks, eventBus)
err = service.Start()
require.NoError(t, err)
t.Cleanup(func() {


+ 1
- 1
libs/pubsub/pubsub.go View File

@ -136,10 +136,10 @@ type Option func(*Server)
// provided, the resulting server's queue is unbuffered.
func NewServer(options ...Option) *Server {
s := new(Server)
s.BaseService = *service.NewBaseService(nil, "PubSub", s)
for _, opt := range options {
opt(s)
}
s.BaseService = *service.NewBaseService(nil, "PubSub", s)
// The queue receives items to be published.
s.queue = make(chan item, s.queueCap)


+ 8
- 4
libs/pubsub/pubsub_test.go View File

@ -357,8 +357,10 @@ func TestUnsubscribeAll(t *testing.T) {
}
func TestBufferCapacity(t *testing.T) {
s := pubsub.NewServer(pubsub.BufferCapacity(2))
s.SetLogger(log.TestingLogger())
s := pubsub.NewServer(pubsub.BufferCapacity(2),
func(s *pubsub.Server) {
s.Logger = log.TestingLogger()
})
require.Equal(t, 2, s.BufferCapacity())
@ -376,8 +378,10 @@ func TestBufferCapacity(t *testing.T) {
func newTestServer(t testing.TB) *pubsub.Server {
t.Helper()
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s := pubsub.NewServer(func(s *pubsub.Server) {
s.Logger = log.TestingLogger()
})
require.NoError(t, s.Start())
t.Cleanup(func() {
assert.NoError(t, s.Stop())


+ 0
- 8
libs/service/service.go View File

@ -48,9 +48,6 @@ type Service interface {
// String representation of the service
String() string
// SetLogger sets a logger.
SetLogger(log.Logger)
// Wait blocks until the service is stopped.
Wait()
}
@ -122,11 +119,6 @@ func NewBaseService(logger log.Logger, name string, impl Service) *BaseService {
}
}
// SetLogger implements Service by setting a logger.
func (bs *BaseService) SetLogger(l log.Logger) {
bs.Logger = l
}
// Start implements Service by calling OnStart (if defined). An error will be
// returned if the service is already running or stopped. Not to start the
// stopped service, you need to call Reset.


+ 1
- 1
node/node.go View File

@ -106,7 +106,7 @@ func newDefaultNode(cfg *config.Config, logger log.Logger) (service.Service, err
pval = nil
}
appClient, _ := proxy.DefaultClientCreator(cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
appClient, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
return makeNode(cfg,
pval,


+ 3
- 3
node/node_test.go View File

@ -249,7 +249,7 @@ func TestCreateProposalBlock(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics())
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
err = proxyApp.Start()
require.Nil(t, err)
defer proxyApp.Stop() //nolint:errcheck // ignore for tests
@ -343,7 +343,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
defer os.RemoveAll(cfg.RootDir)
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics())
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
err = proxyApp.Start()
require.Nil(t, err)
defer proxyApp.Stop() //nolint:errcheck // ignore for tests
@ -407,7 +407,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics())
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
err = proxyApp.Start()
require.Nil(t, err)
defer proxyApp.Stop() //nolint:errcheck // ignore for tests


+ 6
- 5
node/setup.go View File

@ -90,17 +90,17 @@ func initDBs(
// nolint:lll
func createAndStartProxyAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator, metrics)
proxyApp.SetLogger(logger.With("module", "proxy"))
proxyApp := proxy.NewAppConns(clientCreator, logger.With("module", "proxy"), metrics)
if err := proxyApp.Start(); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
}
return proxyApp, nil
}
func createAndStartEventBus(logger log.Logger) (*eventbus.EventBus, error) {
eventBus := eventbus.NewDefault()
eventBus.SetLogger(logger.With("module", "events"))
eventBus := eventbus.NewDefault(logger.With("module", "events"))
if err := eventBus.Start(); err != nil {
return nil, err
}
@ -309,6 +309,7 @@ func createConsensusReactor(
logger = logger.With("module", "consensus")
consensusState := consensus.NewState(
logger,
cfg.Consensus,
state.Copy(),
blockExec,
@ -317,7 +318,7 @@ func createConsensusReactor(
evidencePool,
consensus.StateMetrics(csMetrics),
)
consensusState.SetLogger(logger)
if privValidator != nil && cfg.Mode == config.ModeValidator {
consensusState.SetPrivValidator(privValidator)
}


+ 1
- 1
test/e2e/node/main.go View File

@ -102,7 +102,7 @@ func startApp(cfg *Config) error {
if err != nil {
return err
}
server, err := server.NewServer(cfg.Listen, cfg.Protocol, app)
server, err := server.NewServer(logger, cfg.Listen, cfg.Protocol, app)
if err != nil {
return err
}


+ 1
- 1
test/fuzz/mempool/checktx.go View File

@ -16,7 +16,7 @@ var getMp func() mempool.Mempool
func init() {
app := kvstore.NewApplication()
cc := abciclient.NewLocalCreator(app)
appConnMem, _ := cc()
appConnMem, _ := cc(log.NewNopLogger())
err := appConnMem.Start()
if err != nil {
panic(err)


Loading…
Cancel
Save