From 9a2dd8bc9279ed2a1a4d4f31cc151f8a621cceb3 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 15 Jan 2017 16:59:10 -0800 Subject: [PATCH] Refactor Node; Node is a simple BaseService --- cmd/tendermint/main.go | 3 +- cmd/tendermint/run_node.go | 56 +++++++++++++++ node/node.go | 142 +++++++++++++++---------------------- node/node_test.go | 8 +-- rpc/test/helpers.go | 6 -- 5 files changed, 118 insertions(+), 97 deletions(-) create mode 100644 cmd/tendermint/run_node.go diff --git a/cmd/tendermint/main.go b/cmd/tendermint/main.go index e5abe5b23..2338ad393 100644 --- a/cmd/tendermint/main.go +++ b/cmd/tendermint/main.go @@ -9,7 +9,6 @@ import ( "github.com/tendermint/go-logger" tmcfg "github.com/tendermint/tendermint/config/tendermint" "github.com/tendermint/tendermint/consensus" - "github.com/tendermint/tendermint/node" "github.com/tendermint/tendermint/version" ) @@ -40,7 +39,7 @@ Commands: switch args[0] { case "node": - node.RunNode(config) + run_node(config) case "replay": consensus.RunReplayFile(config, args[1], false) case "replay_console": diff --git a/cmd/tendermint/run_node.go b/cmd/tendermint/run_node.go new file mode 100644 index 000000000..9f13533a5 --- /dev/null +++ b/cmd/tendermint/run_node.go @@ -0,0 +1,56 @@ +package main + +import ( + "io/ioutil" + "time" + + . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" + "github.com/tendermint/tendermint/node" + "github.com/tendermint/tendermint/types" +) + +// Users wishing to: +// * Use an external signer for their validators +// * Supply an in-proc abci app +// should import tendermint/tendermint and implement their own RunNode to +// call NewNode with their custom priv validator and/or custom +// proxy.ClientCreator interface +func run_node(config cfg.Config) { + + // Wait until the genesis doc becomes available + // This is for Mintnet compatibility. + // TODO: If Mintnet gets deprecated or genesis_file is + // always available, remove. + genDocFile := config.GetString("genesis_file") + if !FileExists(genDocFile) { + log.Notice(Fmt("Waiting for genesis file %v...", genDocFile)) + for { + time.Sleep(time.Second) + if !FileExists(genDocFile) { + continue + } + jsonBlob, err := ioutil.ReadFile(genDocFile) + if err != nil { + Exit(Fmt("Couldn't read GenesisDoc file: %v", err)) + } + genDoc := types.GenesisDocFromJSON(jsonBlob) + if genDoc.ChainID == "" { + PanicSanity(Fmt("Genesis doc %v must include non-empty chain_id", genDocFile)) + } + config.Set("chain_id", genDoc.ChainID) + } + } + + // Create & start node + n := node.NewNodeDefault(config) + if _, err := n.Start(); err != nil { + Exit(Fmt("Failed to start node: %v", err)) + } else { + log.Notice("Started node", "nodeInfo", n.Switch().NodeInfo()) + } + + // Trap signal, run forever. + n.RunForever() + +} diff --git a/node/node.go b/node/node.go index 43eba2573..91415ec74 100644 --- a/node/node.go +++ b/node/node.go @@ -2,13 +2,11 @@ package node import ( "bytes" - "io/ioutil" "net" "net/http" "strings" - "time" - . "github.com/tendermint/go-common" + cmn "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" @@ -30,6 +28,8 @@ import ( import _ "net/http/pprof" type Node struct { + cmn.BaseService + config cfg.Config sw *p2p.Switch evsw types.EventSwitch @@ -64,7 +64,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato // Create the proxyApp, which manages connections (consensus, mempool, query) proxyApp := proxy.NewAppConns(config, clientCreator, sm.NewHandshaker(config, state, blockStore)) if _, err := proxyApp.Start(); err != nil { - Exit(Fmt("Error starting proxy app connections: %v", err)) + cmn.Exit(cmn.Fmt("Error starting proxy app connections: %v", err)) } // add the chainid and number of validators to the global config @@ -78,7 +78,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato eventSwitch := types.NewEventSwitch() _, err := eventSwitch.Start() if err != nil { - Exit(Fmt("Failed to start switch: %v", err)) + cmn.Exit(cmn.Fmt("Failed to start switch: %v", err)) } // Decide whether to fast-sync or not @@ -126,14 +126,14 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato if config.GetBool("filter_peers") { // NOTE: addr is ip:port sw.SetAddrFilter(func(addr net.Addr) error { - res := proxyApp.Query().QuerySync([]byte(Fmt("p2p/filter/addr/%s", addr.String()))) + res := proxyApp.Query().QuerySync([]byte(cmn.Fmt("p2p/filter/addr/%s", addr.String()))) if res.IsOK() { return nil } return res }) sw.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error { - res := proxyApp.Query().QuerySync([]byte(Fmt("p2p/filter/pubkey/%X", pubkey.Bytes()))) + res := proxyApp.Query().QuerySync([]byte(cmn.Fmt("p2p/filter/pubkey/%X", pubkey.Bytes()))) if res.IsOK() { return nil } @@ -154,7 +154,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato }() } - return &Node{ + node := &Node{ config: config, sw: sw, evsw: eventSwitch, @@ -168,17 +168,53 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato privKey: privKey, proxyApp: proxyApp, } + node.BaseService = *cmn.NewBaseService(log, "Node", node) + return node } -// Call Start() after adding the listeners. -func (n *Node) Start() error { +func (n *Node) OnStart() error { + n.BaseService.OnStart() + + // Create & add listener + protocol, address := ProtocolAndAddress(n.config.GetString("node_laddr")) + l := p2p.NewDefaultListener(protocol, address, n.config.GetBool("skip_upnp")) + n.sw.AddListener(l) + + // Start the switch n.sw.SetNodeInfo(makeNodeInfo(n.config, n.sw, n.privKey)) n.sw.SetNodePrivKey(n.privKey) _, err := n.sw.Start() - return err + if err != nil { + return err + } + + // Dial out of seed nodes exist + if n.config.GetString("seeds") != "" { + seeds := strings.Split(n.config.GetString("seeds"), ",") + n.sw.DialSeeds(seeds) + } + + // Run the RPC server + if n.config.GetString("rpc_laddr") != "" { + _, err := n.startRPC() + if err != nil { + return err + } + } + + return nil } -func (n *Node) Stop() { +func (n *Node) RunForever() { + // Sleep forever and then... + cmn.TrapSignal(func() { + n.Stop() + }) +} + +func (n *Node) OnStop() { + n.BaseService.OnStop() + log.Notice("Stopping Node") // TODO: gracefully disconnect from peers. n.sw.Stop() @@ -195,11 +231,10 @@ func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) { // Add listeners before starting the Node. // The first listener is the primary listener (in NodeInfo) func (n *Node) AddListener(l p2p.Listener) { - log.Notice(Fmt("Added %v", l)) n.sw.AddListener(l) } -func (n *Node) StartRPC() ([]net.Listener, error) { +func (n *Node) startRPC() ([]net.Listener, error) { rpccore.SetConfig(n.config) rpccore.SetEventSwitch(n.evsw) @@ -285,16 +320,16 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255 Network: config.GetString("chain_id"), Version: version.Version, Other: []string{ - Fmt("wire_version=%v", wire.Version), - Fmt("p2p_version=%v", p2p.Version), - Fmt("consensus_version=%v", consensus.Version), - Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version), + cmn.Fmt("wire_version=%v", wire.Version), + cmn.Fmt("p2p_version=%v", p2p.Version), + cmn.Fmt("consensus_version=%v", consensus.Version), + cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version), }, } // include git hash in the nodeInfo if available - if rev, err := ReadFile(config.GetString("revision_file")); err == nil { - nodeInfo.Other = append(nodeInfo.Other, Fmt("revision=%v", string(rev))) + if rev, err := cmn.ReadFile(config.GetString("revision_file")); err == nil { + nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("revision=%v", string(rev))) } if !sw.IsListening() { @@ -309,74 +344,11 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255 // We assume that the rpcListener has the same ExternalAddress. // This is probably true because both P2P and RPC listeners use UPnP, // except of course if the rpc is only bound to localhost - nodeInfo.ListenAddr = Fmt("%v:%v", p2pHost, p2pPort) - nodeInfo.Other = append(nodeInfo.Other, Fmt("rpc_addr=%v", rpcListenAddr)) + nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort) + nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr)) return nodeInfo } -//------------------------------------------------------------------------------ - -// Users wishing to: -// * use an external signer for their validators -// * supply an in-proc abci app -// should fork tendermint/tendermint and implement RunNode to -// call NewNode with their custom priv validator and/or custom -// proxy.ClientCreator interface -func RunNode(config cfg.Config) { - // Wait until the genesis doc becomes available - genDocFile := config.GetString("genesis_file") - if !FileExists(genDocFile) { - log.Notice(Fmt("Waiting for genesis file %v...", genDocFile)) - for { - time.Sleep(time.Second) - if !FileExists(genDocFile) { - continue - } - jsonBlob, err := ioutil.ReadFile(genDocFile) - if err != nil { - Exit(Fmt("Couldn't read GenesisDoc file: %v", err)) - } - genDoc := types.GenesisDocFromJSON(jsonBlob) - if genDoc.ChainID == "" { - PanicSanity(Fmt("Genesis doc %v must include non-empty chain_id", genDocFile)) - } - config.Set("chain_id", genDoc.ChainID) - } - } - - // Create & start node - n := NewNodeDefault(config) - - protocol, address := ProtocolAndAddress(config.GetString("node_laddr")) - l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp")) - n.AddListener(l) - err := n.Start() - if err != nil { - Exit(Fmt("Failed to start node: %v", err)) - } - - log.Notice("Started node", "nodeInfo", n.sw.NodeInfo()) - - // If seedNode is provided by config, dial out. - if config.GetString("seeds") != "" { - seeds := strings.Split(config.GetString("seeds"), ",") - n.sw.DialSeeds(seeds) - } - - // Run the RPC server. - if config.GetString("rpc_laddr") != "" { - _, err := n.StartRPC() - if err != nil { - PanicCrisis(err) - } - } - - // Sleep forever and then... - TrapSignal(func() { - n.Stop() - }) -} - func (n *Node) NodeInfo() *p2p.NodeInfo { return n.sw.NodeInfo() } diff --git a/node/node_test.go b/node/node_test.go index 880a2dab7..2ab8e8dc6 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -4,7 +4,6 @@ import ( "testing" "time" - "github.com/tendermint/go-p2p" "github.com/tendermint/tendermint/config/tendermint_test" ) @@ -13,12 +12,13 @@ func TestNodeStartStop(t *testing.T) { // Create & start node n := NewNodeDefault(config) - protocol, address := ProtocolAndAddress(config.GetString("node_laddr")) - l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp")) - n.AddListener(l) n.Start() log.Notice("Started node", "nodeInfo", n.sw.NodeInfo()) + + // Wait a bit to initialize + // TODO remove time.Sleep(), make asynchronous. time.Sleep(time.Second * 2) + ch := make(chan struct{}, 1) go func() { n.Stop() diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index da6482483..76fcd4b3b 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -6,7 +6,6 @@ import ( . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" client "github.com/tendermint/go-rpc/client" @@ -57,13 +56,8 @@ func init() { func newNode(ready chan struct{}) { // Create & start node node = nm.NewNodeDefault(config) - protocol, address := nm.ProtocolAndAddress(config.GetString("node_laddr")) - l := p2p.NewDefaultListener(protocol, address, true) - node.AddListener(l) node.Start() - // Run the RPC server. - node.StartRPC() time.Sleep(time.Second) ready <- struct{}{}