Browse Source

node: allow replacing existing p2p.Reactor(s) (#3846)

* node: allow replacing existing p2p.Reactor(s)

using [`CustomReactors`
option](https://godoc.org/github.com/tendermint/tendermint/node#CustomReactors).
Warning: beware of accidental name clashes. Here is the list of existing
reactors: MEMPOOL, BLOCKCHAIN, CONSENSUS, EVIDENCE, PEX.

* check the absence of "CUSTOM" prefix

* merge 2 tests

* add doc.go to node package
pull/3926/head
Anton Kaliaev 5 years ago
committed by Jack Zampolin
parent
commit
5c9d6d839e
5 changed files with 86 additions and 11 deletions
  1. +4
    -0
      CHANGELOG_PENDING.md
  2. +40
    -0
      node/doc.go
  3. +17
    -6
      node/node.go
  4. +6
    -1
      node/node_test.go
  5. +19
    -4
      p2p/switch.go

+ 4
- 0
CHANGELOG_PENDING.md View File

@ -17,6 +17,10 @@ program](https://hackerone.com/tendermint).
- [libs] \#3811 Remove `db` from libs in favor of `https://github.com/tendermint/tm-cmn`
### FEATURES:
- [node] Allow replacing existing p2p.Reactor(s) using [`CustomReactors`
option](https://godoc.org/github.com/tendermint/tendermint/node#CustomReactors).
Warning: beware of accidental name clashes. Here is the list of existing
reactors: MEMPOOL, BLOCKCHAIN, CONSENSUS, EVIDENCE, PEX.
### IMPROVEMENTS:


+ 40
- 0
node/doc.go View File

@ -0,0 +1,40 @@
/*
Package node is the main entry point, where the Node struct, which
represents a full node, is defined.
Adding new p2p.Reactor(s)
To add a new p2p.Reactor, use the CustomReactors option:
node, err := NewNode(
config,
privVal,
nodeKey,
clientCreator,
genesisDocProvider,
dbProvider,
metricsProvider,
logger,
CustomReactors(map[string]p2p.Reactor{"CUSTOM": customReactor}),
)
Replacing existing p2p.Reactor(s)
To replace the built-in p2p.Reactor, use the CustomReactors option:
node, err := NewNode(
config,
privVal,
nodeKey,
clientCreator,
genesisDocProvider,
dbProvider,
metricsProvider,
logger,
CustomReactors(map[string]p2p.Reactor{"BLOCKCHAIN": customBlockchainReactor}),
)
The list of existing reactors can be found in CustomReactors documentation.
*/
package node

+ 17
- 6
node/node.go View File

@ -48,10 +48,6 @@ import (
dbm "github.com/tendermint/tm-cmn/db"
)
// CustomReactorNamePrefix is a prefix for all custom reactors to prevent
// clashes with built-in reactors.
const CustomReactorNamePrefix = "CUSTOM_"
//------------------------------------------------------------------------------
// DBContext specifies config information for loading a new DB.
@ -144,11 +140,26 @@ func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
// Option sets a parameter for the node.
type Option func(*Node)
// CustomReactors allows you to add custom reactors to the node's Switch.
// CustomReactors allows you to add custom reactors (name -> p2p.Reactor) to
// the node's Switch.
//
// WARNING: using any name from the below list of the existing reactors will
// result in replacing it with the custom one.
//
// - MEMPOOL
// - BLOCKCHAIN
// - CONSENSUS
// - EVIDENCE
// - PEX
func CustomReactors(reactors map[string]p2p.Reactor) Option {
return func(n *Node) {
for name, reactor := range reactors {
n.sw.AddReactor(CustomReactorNamePrefix+name, reactor)
if existingReactor := n.sw.Reactor(name); existingReactor != nil {
n.sw.Logger.Info("Replacing existing reactor with a custom one",
"name", name, "existing", existingReactor, "custom", reactor)
n.sw.RemoveReactor(name, existingReactor)
}
n.sw.AddReactor(name, reactor)
}
}
}


+ 6
- 1
node/node_test.go View File

@ -288,6 +288,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
defer os.RemoveAll(config.RootDir)
cr := p2pmock.NewReactor()
customBlockchainReactor := p2pmock.NewReactor()
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
require.NoError(t, err)
@ -300,7 +301,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
DefaultDBProvider,
DefaultMetricsProvider(config.Instrumentation),
log.TestingLogger(),
CustomReactors(map[string]p2p.Reactor{"FOO": cr}),
CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKCHAIN": customBlockchainReactor}),
)
require.NoError(t, err)
@ -309,6 +310,10 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
defer n.Stop()
assert.True(t, cr.IsRunning())
assert.Equal(t, cr, n.Switch().Reactor("FOO"))
assert.True(t, customBlockchainReactor.IsRunning())
assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN"))
}
func state(nVals int, height int64) (sm.State, dbm.DB) {


+ 19
- 4
p2p/switch.go View File

@ -152,11 +152,9 @@ func WithMetrics(metrics *Metrics) SwitchOption {
// AddReactor adds the given reactor to the switch.
// NOTE: Not goroutine safe.
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
// Validate the reactor.
// No two reactors can share the same channel.
reactorChannels := reactor.GetChannels()
for _, chDesc := range reactorChannels {
for _, chDesc := range reactor.GetChannels() {
chID := chDesc.ID
// No two reactors can share the same channel.
if sw.reactorsByCh[chID] != nil {
panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
}
@ -168,6 +166,23 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
return reactor
}
// RemoveReactor removes the given Reactor from the Switch.
// NOTE: Not goroutine safe.
func (sw *Switch) RemoveReactor(name string, reactor Reactor) {
for _, chDesc := range reactor.GetChannels() {
// remove channel description
for i := 0; i < len(sw.chDescs); i++ {
if chDesc.ID == sw.chDescs[i].ID {
sw.chDescs = append(sw.chDescs[:i], sw.chDescs[i+1:]...)
break
}
}
delete(sw.reactorsByCh, chDesc.ID)
}
delete(sw.reactors, name)
reactor.SetSwitch(nil)
}
// Reactors returns a map of reactors registered on the switch.
// NOTE: Not goroutine safe.
func (sw *Switch) Reactors() map[string]Reactor {


Loading…
Cancel
Save