Browse Source

node: refactor privValidator ext client code & tests (#2895)

* update ConsensusState#OnStop comment

* consensus: set logger for WAL in tests

* refactor privValidator client code and tests

follow-up on https://github.com/tendermint/tendermint/pull/2866
pull/2903/head
Anton Kaliaev 6 years ago
committed by GitHub
parent
commit
b487feba42
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 105 additions and 142 deletions
  1. +2
    -0
      CHANGELOG_PENDING.md
  2. +5
    -12
      consensus/replay_test.go
  3. +2
    -1
      consensus/state.go
  4. +13
    -16
      consensus/wal_test.go
  5. +38
    -48
      node/node.go
  6. +44
    -65
      node/node_test.go
  7. +1
    -0
      privval/ipc_server.go

+ 2
- 0
CHANGELOG_PENDING.md View File

@ -2,6 +2,8 @@
## v0.26.4
*TBD*
Special thanks to external contributors on this release:
Friendly reminder, we have a [bug bounty


+ 5
- 12
consensus/replay_test.go View File

@ -315,28 +315,21 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
config := ResetConfig("proxy_test_")
walBody, err := WALWithNBlocks(NUM_BLOCKS)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
walFile := tempWALWithData(walBody)
config.Consensus.SetWalFile(walFile)
privVal := privval.LoadFilePV(config.PrivValidatorFile())
wal, err := NewWAL(walFile)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
wal.SetLogger(log.TestingLogger())
if err := wal.Start(); err != nil {
t.Fatal(err)
}
err = wal.Start()
require.NoError(t, err)
defer wal.Stop()
chain, commits, err := makeBlockchainFromWAL(wal)
if err != nil {
t.Fatalf(err.Error())
}
require.NoError(t, err)
stateDB, state, store := stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion)
store.chain = chain


+ 2
- 1
consensus/state.go View File

@ -324,10 +324,11 @@ func (cs *ConsensusState) startRoutines(maxSteps int) {
go cs.receiveRoutine(maxSteps)
}
// OnStop implements cmn.Service. It stops all routines and waits for the WAL to finish.
// OnStop implements cmn.Service.
func (cs *ConsensusState) OnStop() {
cs.evsw.Stop()
cs.timeoutTicker.Stop()
// WAL is stopped in receiveRoutine.
}
// Wait waits for the the main routine to return.


+ 13
- 16
consensus/wal_test.go View File

@ -7,13 +7,13 @@ import (
"io/ioutil"
"os"
"path/filepath"
// "sync"
"testing"
"time"
"github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/autofile"
"github.com/tendermint/tendermint/libs/log"
tmtypes "github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
@ -23,29 +23,27 @@ import (
func TestWALTruncate(t *testing.T) {
walDir, err := ioutil.TempDir("", "wal")
if err != nil {
panic(fmt.Errorf("failed to create temp WAL file: %v", err))
}
require.NoError(t, err)
defer os.RemoveAll(walDir)
walFile := filepath.Join(walDir, "wal")
//this magic number 4K can truncate the content when RotateFile. defaultHeadSizeLimit(10M) is hard to simulate.
//this magic number 1 * time.Millisecond make RotateFile check frequently. defaultGroupCheckDuration(5s) is hard to simulate.
wal, err := NewWAL(walFile, autofile.GroupHeadSizeLimit(4096), autofile.GroupCheckDuration(1*time.Millisecond))
if err != nil {
t.Fatal(err)
}
wal.Start()
wal, err := NewWAL(walFile,
autofile.GroupHeadSizeLimit(4096),
autofile.GroupCheckDuration(1*time.Millisecond),
)
require.NoError(t, err)
wal.SetLogger(log.TestingLogger())
err = wal.Start()
require.NoError(t, err)
defer wal.Stop()
//60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), when headBuf is full, truncate content will Flush to the file.
//at this time, RotateFile is called, truncate content exist in each file.
err = WALGenerateNBlocks(wal.Group(), 60)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
time.Sleep(1 * time.Millisecond) //wait groupCheckDuration, make sure RotateFile run
@ -99,9 +97,8 @@ func TestWALSearchForEndHeight(t *testing.T) {
walFile := tempWALWithData(walBody)
wal, err := NewWAL(walFile)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
wal.SetLogger(log.TestingLogger())
h := int64(3)
gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{})


+ 38
- 48
node/node.go View File

@ -3,7 +3,6 @@ package node
import (
"bytes"
"context"
"errors"
"fmt"
"net"
"net/http"
@ -11,11 +10,12 @@ import (
"strings"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
"github.com/tendermint/go-amino"
amino "github.com/tendermint/go-amino"
abci "github.com/tendermint/tendermint/abci/types"
bc "github.com/tendermint/tendermint/blockchain"
cfg "github.com/tendermint/tendermint/config"
@ -148,44 +148,6 @@ type Node struct {
prometheusSrv *http.Server
}
func createExternalPrivValidator(listenAddr string, logger log.Logger) (types.PrivValidator, error) {
protocol, address := cmn.ProtocolAndAddress(listenAddr)
var pvsc types.PrivValidator
switch (protocol) {
case "unix":
pvsc = privval.NewIPCVal(
logger.With("module", "privval"),
address,
)
case "tcp":
// TODO: persist this key so external signer
// can actually authenticate us
pvsc = privval.NewTCPVal(
logger.With("module", "privval"),
listenAddr,
ed25519.GenPrivKey(),
)
default:
return nil, fmt.Errorf(
"Error creating private validator: expected either tcp or unix "+
"protocols, got %s",
protocol,
)
}
pvServ, _ := pvsc.(cmn.Service)
if err := pvServ.Start(); err != nil {
return nil, fmt.Errorf("Error starting private validator client: %v", err)
}
return pvsc, nil
}
// NewNode returns a new, ready to go, Tendermint Node.
func NewNode(config *cfg.Config,
privValidator types.PrivValidator,
@ -259,11 +221,12 @@ func NewNode(config *cfg.Config,
}
if config.PrivValidatorListenAddr != "" {
// If an address is provided, listen on the socket for a
// connection from an external signing process.
privValidator, err = createExternalPrivValidator(config.PrivValidatorListenAddr, logger)
// If an address is provided, listen on the socket for a connection from an
// external signing process.
// FIXME: we should start services inside OnStart
privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "Error with private validator socket client")
}
}
@ -626,11 +589,8 @@ func (n *Node) OnStop() {
}
}
if pvsc, ok := n.privValidator.(cmn.Service); ok {
if err := pvsc.Stop(); err != nil {
n.Logger.Error("Error stopping priv validator client", "err", err)
}
pvsc.Stop()
}
if n.prometheusSrv != nil {
@ -884,6 +844,36 @@ func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) {
db.SetSync(genesisDocKey, bytes)
}
func createAndStartPrivValidatorSocketClient(
listenAddr string,
logger log.Logger,
) (types.PrivValidator, error) {
var pvsc types.PrivValidator
protocol, address := cmn.ProtocolAndAddress(listenAddr)
switch protocol {
case "unix":
pvsc = privval.NewIPCVal(logger.With("module", "privval"), address)
case "tcp":
// TODO: persist this key so external signer
// can actually authenticate us
pvsc = privval.NewTCPVal(logger.With("module", "privval"), listenAddr, ed25519.GenPrivKey())
default:
return nil, fmt.Errorf(
"Wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
protocol,
)
}
if pvsc, ok := pvsc.(cmn.Service); ok {
if err := pvsc.Start(); err != nil {
return nil, errors.Wrap(err, "failed to start")
}
}
return pvsc, nil
}
// splitAndTrimEmpty slices s into all subslices separated by sep and returns a
// slice of the string s with all leading and trailing Unicode code points
// contained in cutset removed. If sep is empty, SplitAndTrim splits after each


+ 44
- 65
node/node_test.go View File

@ -3,28 +3,28 @@ package node
import (
"context"
"fmt"
"net"
"os"
"syscall"
"testing"
"time"
"net"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/version"
"github.com/tendermint/tendermint/crypto/ed25519"
cfg "github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
"github.com/tendermint/tendermint/privval"
tmtime "github.com/tendermint/tendermint/types/time"
)
func TestNodeStartStop(t *testing.T) {
@ -32,17 +32,16 @@ func TestNodeStartStop(t *testing.T) {
// create & start node
n, err := DefaultNewNode(config, log.TestingLogger())
assert.NoError(t, err, "expected no err on DefaultNewNode")
err1 := n.Start()
if err1 != nil {
t.Error(err1)
}
require.NoError(t, err)
err = n.Start()
require.NoError(t, err)
t.Logf("Started node %v", n.sw.NodeInfo())
// wait for the node to produce a block
blockCh := make(chan interface{})
err = n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock, blockCh)
assert.NoError(t, err)
require.NoError(t, err)
select {
case <-blockCh:
case <-time.After(10 * time.Second):
@ -94,7 +93,7 @@ func TestNodeDelayedStop(t *testing.T) {
// create & start node
n, err := DefaultNewNode(config, log.TestingLogger())
n.GenesisDoc().GenesisTime = now.Add(5 * time.Second)
assert.NoError(t, err)
require.NoError(t, err)
n.Start()
startTime := tmtime.Now()
@ -106,7 +105,7 @@ func TestNodeSetAppVersion(t *testing.T) {
// create & start node
n, err := DefaultNewNode(config, log.TestingLogger())
assert.NoError(t, err, "expected no err on DefaultNewNode")
require.NoError(t, err)
// default config uses the kvstore app
var appVersion version.Protocol = kvstore.ProtocolVersion
@ -122,91 +121,71 @@ func TestNodeSetAppVersion(t *testing.T) {
func TestNodeSetPrivValTCP(t *testing.T) {
addr := "tcp://" + testFreeAddr(t)
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
config.BaseConfig.PrivValidatorListenAddr = addr
rs := privval.NewRemoteSigner(
log.TestingLogger(),
cmn.RandStr(12),
config.ChainID(),
addr,
types.NewMockPV(),
ed25519.GenPrivKey(),
)
privval.RemoteSignerConnDeadline(5 * time.Millisecond)(rs)
privval.RemoteSignerConnRetries(1e6)(rs)
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
config.BaseConfig.PrivValidatorListenAddr = addr
// kick off remote signer routine, and then start TM.
go func(rs *privval.RemoteSigner) {
rs.Start()
defer rs.Stop()
time.Sleep(100 * time.Millisecond)
}(rs)
go func() {
err := rs.Start()
if err != nil {
panic(err)
}
}()
defer rs.Stop()
n, err := DefaultNewNode(config, log.TestingLogger())
assert.NoError(t, err, "expected no err on DefaultNewNode")
require.NoError(t, err)
assert.IsType(t, &privval.TCPVal{}, n.PrivValidator())
}
func TestNodeSetPrivValTCPNoPrefix(t *testing.T) {
addr := "tcp://" + testFreeAddr(t)
// address without a protocol must result in error
func TestPrivValidatorListenAddrNoProtocol(t *testing.T) {
addrNoPrefix := testFreeAddr(t)
rs := privval.NewRemoteSigner(
log.TestingLogger(),
cmn.RandStr(12),
addr,
types.NewMockPV(),
ed25519.GenPrivKey(),
)
privval.RemoteSignerConnDeadline(5 * time.Millisecond)(rs)
privval.RemoteSignerConnRetries(1e6)(rs)
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
config.BaseConfig.PrivValidatorListenAddr = addr
// kick off remote signer routine, and then start TM.
go func(rs *privval.RemoteSigner) {
rs.Start()
defer rs.Stop()
time.Sleep(100 * time.Millisecond)
}(rs)
config.BaseConfig.PrivValidatorListenAddr = addrNoPrefix
n, err := DefaultNewNode(config, log.TestingLogger())
assert.NoError(t, err, "expected no err on DefaultNewNode")
assert.IsType(t, &privval.TCPVal{}, n.PrivValidator())
_, err := DefaultNewNode(config, log.TestingLogger())
assert.Error(t, err)
}
func TestNodeSetPrivValIPC(t *testing.T) {
tmpfile := "/tmp/kms." + cmn.RandStr(6) + ".sock"
defer os.Remove(tmpfile) // clean up
addr := "unix://" + tmpfile
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
config.BaseConfig.PrivValidatorListenAddr = "unix://" + tmpfile
rs := privval.NewIPCRemoteSigner(
log.TestingLogger(),
cmn.RandStr(12),
config.ChainID(),
tmpfile,
types.NewMockPV(),
)
privval.IPCRemoteSignerConnDeadline(3 * time.Second)(rs)
// kick off remote signer routine, and then start TM.
go func(rs *privval.IPCRemoteSigner) {
rs.Start()
defer rs.Stop()
time.Sleep(500 * time.Millisecond)
}(rs)
done := make(chan struct{})
go func() {
defer close(done)
n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.IsType(t, &privval.IPCVal{}, n.PrivValidator())
}()
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
config.BaseConfig.PrivValidatorListenAddr = addr
n, err := DefaultNewNode(config, log.TestingLogger())
err := rs.Start()
require.NoError(t, err)
defer rs.Stop()
assert.NoError(t, err, "expected no err on DefaultNewNode")
assert.IsType(t, &privval.IPCVal{}, n.PrivValidator())
<-done
}
// testFreeAddr claims a free port so we don't block on listener being ready.
func testFreeAddr(t *testing.T) string {
ln, err := net.Listen("tcp", "127.0.0.1:0")


+ 1
- 0
privval/ipc_server.go View File

@ -69,6 +69,7 @@ func (rs *IPCRemoteSigner) OnStart() error {
for {
conn, err := rs.listener.AcceptUnix()
if err != nil {
rs.Logger.Error("AcceptUnix", "err", err)
return
}
go rs.handleConnection(conn)


Loading…
Cancel
Save