diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 9e229eb2f..9d65efbe5 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -17,6 +17,10 @@ program](https://hackerone.com/tendermint). - [libs] \#3811 Remove `db` from libs in favor of `https://github.com/tendermint/tm-cmn` ### FEATURES: +- [node] Allow replacing existing p2p.Reactor(s) using [`CustomReactors` + option](https://godoc.org/github.com/tendermint/tendermint/node#CustomReactors). + Warning: beware of accidental name clashes. Here is the list of existing + reactors: MEMPOOL, BLOCKCHAIN, CONSENSUS, EVIDENCE, PEX. ### IMPROVEMENTS: diff --git a/node/doc.go b/node/doc.go new file mode 100644 index 000000000..08f3fa258 --- /dev/null +++ b/node/doc.go @@ -0,0 +1,40 @@ +/* +Package node is the main entry point, where the Node struct, which +represents a full node, is defined. + +Adding new p2p.Reactor(s) + +To add a new p2p.Reactor, use the CustomReactors option: + + node, err := NewNode( + config, + privVal, + nodeKey, + clientCreator, + genesisDocProvider, + dbProvider, + metricsProvider, + logger, + CustomReactors(map[string]p2p.Reactor{"CUSTOM": customReactor}), + ) + +Replacing existing p2p.Reactor(s) + +To replace the built-in p2p.Reactor, use the CustomReactors option: + + node, err := NewNode( + config, + privVal, + nodeKey, + clientCreator, + genesisDocProvider, + dbProvider, + metricsProvider, + logger, + CustomReactors(map[string]p2p.Reactor{"BLOCKCHAIN": customBlockchainReactor}), + ) + +The list of existing reactors can be found in CustomReactors documentation. + +*/ +package node diff --git a/node/node.go b/node/node.go index 60ba8e6d8..9060cf04b 100644 --- a/node/node.go +++ b/node/node.go @@ -48,10 +48,6 @@ import ( dbm "github.com/tendermint/tm-cmn/db" ) -// CustomReactorNamePrefix is a prefix for all custom reactors to prevent -// clashes with built-in reactors. -const CustomReactorNamePrefix = "CUSTOM_" - //------------------------------------------------------------------------------ // DBContext specifies config information for loading a new DB. @@ -144,11 +140,26 @@ func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { // Option sets a parameter for the node. type Option func(*Node) -// CustomReactors allows you to add custom reactors to the node's Switch. +// CustomReactors allows you to add custom reactors (name -> p2p.Reactor) to +// the node's Switch. +// +// WARNING: using any name from the below list of the existing reactors will +// result in replacing it with the custom one. +// +// - MEMPOOL +// - BLOCKCHAIN +// - CONSENSUS +// - EVIDENCE +// - PEX func CustomReactors(reactors map[string]p2p.Reactor) Option { return func(n *Node) { for name, reactor := range reactors { - n.sw.AddReactor(CustomReactorNamePrefix+name, reactor) + if existingReactor := n.sw.Reactor(name); existingReactor != nil { + n.sw.Logger.Info("Replacing existing reactor with a custom one", + "name", name, "existing", existingReactor, "custom", reactor) + n.sw.RemoveReactor(name, existingReactor) + } + n.sw.AddReactor(name, reactor) } } } diff --git a/node/node_test.go b/node/node_test.go index 0a0f8156a..669209f1a 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -288,6 +288,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) { defer os.RemoveAll(config.RootDir) cr := p2pmock.NewReactor() + customBlockchainReactor := p2pmock.NewReactor() nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile()) require.NoError(t, err) @@ -300,7 +301,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) { DefaultDBProvider, DefaultMetricsProvider(config.Instrumentation), log.TestingLogger(), - CustomReactors(map[string]p2p.Reactor{"FOO": cr}), + CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKCHAIN": customBlockchainReactor}), ) require.NoError(t, err) @@ -309,6 +310,10 @@ func TestNodeNewNodeCustomReactors(t *testing.T) { defer n.Stop() assert.True(t, cr.IsRunning()) + assert.Equal(t, cr, n.Switch().Reactor("FOO")) + + assert.True(t, customBlockchainReactor.IsRunning()) + assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN")) } func state(nVals int, height int64) (sm.State, dbm.DB) { diff --git a/p2p/switch.go b/p2p/switch.go index 7e681d67c..66c2f9e4a 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -152,11 +152,9 @@ func WithMetrics(metrics *Metrics) SwitchOption { // AddReactor adds the given reactor to the switch. // NOTE: Not goroutine safe. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { - // Validate the reactor. - // No two reactors can share the same channel. - reactorChannels := reactor.GetChannels() - for _, chDesc := range reactorChannels { + for _, chDesc := range reactor.GetChannels() { chID := chDesc.ID + // No two reactors can share the same channel. if sw.reactorsByCh[chID] != nil { panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor)) } @@ -168,6 +166,23 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { return reactor } +// RemoveReactor removes the given Reactor from the Switch. +// NOTE: Not goroutine safe. +func (sw *Switch) RemoveReactor(name string, reactor Reactor) { + for _, chDesc := range reactor.GetChannels() { + // remove channel description + for i := 0; i < len(sw.chDescs); i++ { + if chDesc.ID == sw.chDescs[i].ID { + sw.chDescs = append(sw.chDescs[:i], sw.chDescs[i+1:]...) + break + } + } + delete(sw.reactorsByCh, chDesc.ID) + } + delete(sw.reactors, name) + reactor.SetSwitch(nil) +} + // Reactors returns a map of reactors registered on the switch. // NOTE: Not goroutine safe. func (sw *Switch) Reactors() map[string]Reactor {