Browse Source

Refactor Node; Node is a simple BaseService

pull/369/head
Jae Kwon 8 years ago
parent
commit
9a2dd8bc92
5 changed files with 118 additions and 97 deletions
  1. +1
    -2
      cmd/tendermint/main.go
  2. +56
    -0
      cmd/tendermint/run_node.go
  3. +57
    -85
      node/node.go
  4. +4
    -4
      node/node_test.go
  5. +0
    -6
      rpc/test/helpers.go

+ 1
- 2
cmd/tendermint/main.go View File

@ -9,7 +9,6 @@ import (
"github.com/tendermint/go-logger"
tmcfg "github.com/tendermint/tendermint/config/tendermint"
"github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/version"
)
@ -40,7 +39,7 @@ Commands:
switch args[0] {
case "node":
node.RunNode(config)
run_node(config)
case "replay":
consensus.RunReplayFile(config, args[1], false)
case "replay_console":


+ 56
- 0
cmd/tendermint/run_node.go View File

@ -0,0 +1,56 @@
package main
import (
"io/ioutil"
"time"
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/types"
)
// Users wishing to:
// * Use an external signer for their validators
// * Supply an in-proc abci app
// should import tendermint/tendermint and implement their own RunNode to
// call NewNode with their custom priv validator and/or custom
// proxy.ClientCreator interface
func run_node(config cfg.Config) {
// Wait until the genesis doc becomes available
// This is for Mintnet compatibility.
// TODO: If Mintnet gets deprecated or genesis_file is
// always available, remove.
genDocFile := config.GetString("genesis_file")
if !FileExists(genDocFile) {
log.Notice(Fmt("Waiting for genesis file %v...", genDocFile))
for {
time.Sleep(time.Second)
if !FileExists(genDocFile) {
continue
}
jsonBlob, err := ioutil.ReadFile(genDocFile)
if err != nil {
Exit(Fmt("Couldn't read GenesisDoc file: %v", err))
}
genDoc := types.GenesisDocFromJSON(jsonBlob)
if genDoc.ChainID == "" {
PanicSanity(Fmt("Genesis doc %v must include non-empty chain_id", genDocFile))
}
config.Set("chain_id", genDoc.ChainID)
}
}
// Create & start node
n := node.NewNodeDefault(config)
if _, err := n.Start(); err != nil {
Exit(Fmt("Failed to start node: %v", err))
} else {
log.Notice("Started node", "nodeInfo", n.Switch().NodeInfo())
}
// Trap signal, run forever.
n.RunForever()
}

+ 57
- 85
node/node.go View File

@ -2,13 +2,11 @@ package node
import (
"bytes"
"io/ioutil"
"net"
"net/http"
"strings"
"time"
. "github.com/tendermint/go-common"
cmn "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto"
dbm "github.com/tendermint/go-db"
@ -30,6 +28,8 @@ import (
import _ "net/http/pprof"
type Node struct {
cmn.BaseService
config cfg.Config
sw *p2p.Switch
evsw types.EventSwitch
@ -64,7 +64,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
// Create the proxyApp, which manages connections (consensus, mempool, query)
proxyApp := proxy.NewAppConns(config, clientCreator, sm.NewHandshaker(config, state, blockStore))
if _, err := proxyApp.Start(); err != nil {
Exit(Fmt("Error starting proxy app connections: %v", err))
cmn.Exit(cmn.Fmt("Error starting proxy app connections: %v", err))
}
// add the chainid and number of validators to the global config
@ -78,7 +78,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
eventSwitch := types.NewEventSwitch()
_, err := eventSwitch.Start()
if err != nil {
Exit(Fmt("Failed to start switch: %v", err))
cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
}
// Decide whether to fast-sync or not
@ -126,14 +126,14 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
if config.GetBool("filter_peers") {
// NOTE: addr is ip:port
sw.SetAddrFilter(func(addr net.Addr) error {
res := proxyApp.Query().QuerySync([]byte(Fmt("p2p/filter/addr/%s", addr.String())))
res := proxyApp.Query().QuerySync([]byte(cmn.Fmt("p2p/filter/addr/%s", addr.String())))
if res.IsOK() {
return nil
}
return res
})
sw.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
res := proxyApp.Query().QuerySync([]byte(Fmt("p2p/filter/pubkey/%X", pubkey.Bytes())))
res := proxyApp.Query().QuerySync([]byte(cmn.Fmt("p2p/filter/pubkey/%X", pubkey.Bytes())))
if res.IsOK() {
return nil
}
@ -154,7 +154,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
}()
}
return &Node{
node := &Node{
config: config,
sw: sw,
evsw: eventSwitch,
@ -168,17 +168,53 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
privKey: privKey,
proxyApp: proxyApp,
}
node.BaseService = *cmn.NewBaseService(log, "Node", node)
return node
}
// Call Start() after adding the listeners.
func (n *Node) Start() error {
func (n *Node) OnStart() error {
n.BaseService.OnStart()
// Create & add listener
protocol, address := ProtocolAndAddress(n.config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, n.config.GetBool("skip_upnp"))
n.sw.AddListener(l)
// Start the switch
n.sw.SetNodeInfo(makeNodeInfo(n.config, n.sw, n.privKey))
n.sw.SetNodePrivKey(n.privKey)
_, err := n.sw.Start()
return err
if err != nil {
return err
}
// Dial out of seed nodes exist
if n.config.GetString("seeds") != "" {
seeds := strings.Split(n.config.GetString("seeds"), ",")
n.sw.DialSeeds(seeds)
}
// Run the RPC server
if n.config.GetString("rpc_laddr") != "" {
_, err := n.startRPC()
if err != nil {
return err
}
}
return nil
}
func (n *Node) Stop() {
func (n *Node) RunForever() {
// Sleep forever and then...
cmn.TrapSignal(func() {
n.Stop()
})
}
func (n *Node) OnStop() {
n.BaseService.OnStop()
log.Notice("Stopping Node")
// TODO: gracefully disconnect from peers.
n.sw.Stop()
@ -195,11 +231,10 @@ func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
// Add listeners before starting the Node.
// The first listener is the primary listener (in NodeInfo)
func (n *Node) AddListener(l p2p.Listener) {
log.Notice(Fmt("Added %v", l))
n.sw.AddListener(l)
}
func (n *Node) StartRPC() ([]net.Listener, error) {
func (n *Node) startRPC() ([]net.Listener, error) {
rpccore.SetConfig(n.config)
rpccore.SetEventSwitch(n.evsw)
@ -285,16 +320,16 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255
Network: config.GetString("chain_id"),
Version: version.Version,
Other: []string{
Fmt("wire_version=%v", wire.Version),
Fmt("p2p_version=%v", p2p.Version),
Fmt("consensus_version=%v", consensus.Version),
Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
cmn.Fmt("wire_version=%v", wire.Version),
cmn.Fmt("p2p_version=%v", p2p.Version),
cmn.Fmt("consensus_version=%v", consensus.Version),
cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
},
}
// include git hash in the nodeInfo if available
if rev, err := ReadFile(config.GetString("revision_file")); err == nil {
nodeInfo.Other = append(nodeInfo.Other, Fmt("revision=%v", string(rev)))
if rev, err := cmn.ReadFile(config.GetString("revision_file")); err == nil {
nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("revision=%v", string(rev)))
}
if !sw.IsListening() {
@ -309,74 +344,11 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255
// We assume that the rpcListener has the same ExternalAddress.
// This is probably true because both P2P and RPC listeners use UPnP,
// except of course if the rpc is only bound to localhost
nodeInfo.ListenAddr = Fmt("%v:%v", p2pHost, p2pPort)
nodeInfo.Other = append(nodeInfo.Other, Fmt("rpc_addr=%v", rpcListenAddr))
nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
return nodeInfo
}
//------------------------------------------------------------------------------
// Users wishing to:
// * use an external signer for their validators
// * supply an in-proc abci app
// should fork tendermint/tendermint and implement RunNode to
// call NewNode with their custom priv validator and/or custom
// proxy.ClientCreator interface
func RunNode(config cfg.Config) {
// Wait until the genesis doc becomes available
genDocFile := config.GetString("genesis_file")
if !FileExists(genDocFile) {
log.Notice(Fmt("Waiting for genesis file %v...", genDocFile))
for {
time.Sleep(time.Second)
if !FileExists(genDocFile) {
continue
}
jsonBlob, err := ioutil.ReadFile(genDocFile)
if err != nil {
Exit(Fmt("Couldn't read GenesisDoc file: %v", err))
}
genDoc := types.GenesisDocFromJSON(jsonBlob)
if genDoc.ChainID == "" {
PanicSanity(Fmt("Genesis doc %v must include non-empty chain_id", genDocFile))
}
config.Set("chain_id", genDoc.ChainID)
}
}
// Create & start node
n := NewNodeDefault(config)
protocol, address := ProtocolAndAddress(config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp"))
n.AddListener(l)
err := n.Start()
if err != nil {
Exit(Fmt("Failed to start node: %v", err))
}
log.Notice("Started node", "nodeInfo", n.sw.NodeInfo())
// If seedNode is provided by config, dial out.
if config.GetString("seeds") != "" {
seeds := strings.Split(config.GetString("seeds"), ",")
n.sw.DialSeeds(seeds)
}
// Run the RPC server.
if config.GetString("rpc_laddr") != "" {
_, err := n.StartRPC()
if err != nil {
PanicCrisis(err)
}
}
// Sleep forever and then...
TrapSignal(func() {
n.Stop()
})
}
func (n *Node) NodeInfo() *p2p.NodeInfo {
return n.sw.NodeInfo()
}


+ 4
- 4
node/node_test.go View File

@ -4,7 +4,6 @@ import (
"testing"
"time"
"github.com/tendermint/go-p2p"
"github.com/tendermint/tendermint/config/tendermint_test"
)
@ -13,12 +12,13 @@ func TestNodeStartStop(t *testing.T) {
// Create & start node
n := NewNodeDefault(config)
protocol, address := ProtocolAndAddress(config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp"))
n.AddListener(l)
n.Start()
log.Notice("Started node", "nodeInfo", n.sw.NodeInfo())
// Wait a bit to initialize
// TODO remove time.Sleep(), make asynchronous.
time.Sleep(time.Second * 2)
ch := make(chan struct{}, 1)
go func() {
n.Stop()


+ 0
- 6
rpc/test/helpers.go View File

@ -6,7 +6,6 @@ import (
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire"
client "github.com/tendermint/go-rpc/client"
@ -57,13 +56,8 @@ func init() {
func newNode(ready chan struct{}) {
// Create & start node
node = nm.NewNodeDefault(config)
protocol, address := nm.ProtocolAndAddress(config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, true)
node.AddListener(l)
node.Start()
// Run the RPC server.
node.StartRPC()
time.Sleep(time.Second)
ready <- struct{}{}


Loading…
Cancel
Save