You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

368 lines
10 KiB

10 years ago
10 years ago
11 years ago
8 years ago
11 years ago
8 years ago
8 years ago
8 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
9 years ago
10 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "net"
  5. "net/http"
  6. "strings"
  7. cmn "github.com/tendermint/go-common"
  8. cfg "github.com/tendermint/go-config"
  9. "github.com/tendermint/go-crypto"
  10. dbm "github.com/tendermint/go-db"
  11. "github.com/tendermint/go-p2p"
  12. "github.com/tendermint/go-rpc"
  13. "github.com/tendermint/go-rpc/server"
  14. "github.com/tendermint/go-wire"
  15. bc "github.com/tendermint/tendermint/blockchain"
  16. "github.com/tendermint/tendermint/consensus"
  17. mempl "github.com/tendermint/tendermint/mempool"
  18. "github.com/tendermint/tendermint/proxy"
  19. rpccore "github.com/tendermint/tendermint/rpc/core"
  20. grpccore "github.com/tendermint/tendermint/rpc/grpc"
  21. sm "github.com/tendermint/tendermint/state"
  22. "github.com/tendermint/tendermint/types"
  23. "github.com/tendermint/tendermint/version"
  24. )
  25. import _ "net/http/pprof"
  26. type Node struct {
  27. cmn.BaseService
  28. config cfg.Config
  29. sw *p2p.Switch
  30. evsw types.EventSwitch
  31. blockStore *bc.BlockStore
  32. bcReactor *bc.BlockchainReactor
  33. mempoolReactor *mempl.MempoolReactor
  34. consensusState *consensus.ConsensusState
  35. consensusReactor *consensus.ConsensusReactor
  36. privValidator *types.PrivValidator
  37. genesisDoc *types.GenesisDoc
  38. privKey crypto.PrivKeyEd25519
  39. proxyApp proxy.AppConns
  40. }
  41. func NewNodeDefault(config cfg.Config) *Node {
  42. // Get PrivValidator
  43. privValidatorFile := config.GetString("priv_validator_file")
  44. privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
  45. return NewNode(config, privValidator, proxy.DefaultClientCreator(config))
  46. }
  47. func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreator proxy.ClientCreator) *Node {
  48. // Get BlockStore
  49. blockStoreDB := dbm.NewDB("blockstore", config.GetString("db_backend"), config.GetString("db_dir"))
  50. blockStore := bc.NewBlockStore(blockStoreDB)
  51. // Get State
  52. stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
  53. state := sm.GetState(config, stateDB)
  54. // Create the proxyApp, which manages connections (consensus, mempool, query)
  55. proxyApp := proxy.NewAppConns(config, clientCreator, sm.NewHandshaker(config, state, blockStore))
  56. if _, err := proxyApp.Start(); err != nil {
  57. cmn.Exit(cmn.Fmt("Error starting proxy app connections: %v", err))
  58. }
  59. // add the chainid and number of validators to the global config
  60. config.Set("chain_id", state.ChainID)
  61. config.Set("num_vals", state.Validators.Size())
  62. // Generate node PrivKey
  63. privKey := crypto.GenPrivKeyEd25519()
  64. // Make event switch
  65. eventSwitch := types.NewEventSwitch()
  66. _, err := eventSwitch.Start()
  67. if err != nil {
  68. cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
  69. }
  70. // Decide whether to fast-sync or not
  71. // We don't fast-sync when the only validator is us.
  72. fastSync := config.GetBool("fast_sync")
  73. if state.Validators.Size() == 1 {
  74. addr, _ := state.Validators.GetByIndex(0)
  75. if bytes.Equal(privValidator.Address, addr) {
  76. fastSync = false
  77. }
  78. }
  79. // Make BlockchainReactor
  80. bcReactor := bc.NewBlockchainReactor(config, state.Copy(), proxyApp.Consensus(), blockStore, fastSync)
  81. // Make MempoolReactor
  82. mempool := mempl.NewMempool(config, proxyApp.Mempool())
  83. mempoolReactor := mempl.NewMempoolReactor(config, mempool)
  84. // Make ConsensusReactor
  85. consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
  86. if privValidator != nil {
  87. consensusState.SetPrivValidator(privValidator)
  88. }
  89. consensusReactor := consensus.NewConsensusReactor(consensusState, fastSync)
  90. // Make p2p network switch
  91. sw := p2p.NewSwitch(config.GetConfig("p2p"))
  92. sw.AddReactor("MEMPOOL", mempoolReactor)
  93. sw.AddReactor("BLOCKCHAIN", bcReactor)
  94. sw.AddReactor("CONSENSUS", consensusReactor)
  95. // Optionally, start the pex reactor
  96. // TODO: this is a dev feature, it needs some love
  97. if config.GetBool("pex_reactor") {
  98. addrBook := p2p.NewAddrBook(config.GetString("addrbook_file"), config.GetBool("addrbook_strict"))
  99. addrBook.Start()
  100. pexReactor := p2p.NewPEXReactor(addrBook)
  101. sw.AddReactor("PEX", pexReactor)
  102. }
  103. // filter peers by addr or pubkey with a abci query.
  104. // if the query return code is OK, add peer
  105. // XXX: query format subject to change
  106. if config.GetBool("filter_peers") {
  107. // NOTE: addr is ip:port
  108. sw.SetAddrFilter(func(addr net.Addr) error {
  109. res := proxyApp.Query().QuerySync([]byte(cmn.Fmt("p2p/filter/addr/%s", addr.String())))
  110. if res.IsOK() {
  111. return nil
  112. }
  113. return res
  114. })
  115. sw.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
  116. res := proxyApp.Query().QuerySync([]byte(cmn.Fmt("p2p/filter/pubkey/%X", pubkey.Bytes())))
  117. if res.IsOK() {
  118. return nil
  119. }
  120. return res
  121. })
  122. }
  123. // add the event switch to all services
  124. // they should all satisfy events.Eventable
  125. SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  126. // run the profile server
  127. profileHost := config.GetString("prof_laddr")
  128. if profileHost != "" {
  129. go func() {
  130. log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil))
  131. }()
  132. }
  133. node := &Node{
  134. config: config,
  135. sw: sw,
  136. evsw: eventSwitch,
  137. blockStore: blockStore,
  138. bcReactor: bcReactor,
  139. mempoolReactor: mempoolReactor,
  140. consensusState: consensusState,
  141. consensusReactor: consensusReactor,
  142. privValidator: privValidator,
  143. genesisDoc: state.GenesisDoc,
  144. privKey: privKey,
  145. proxyApp: proxyApp,
  146. }
  147. node.BaseService = *cmn.NewBaseService(log, "Node", node)
  148. return node
  149. }
  150. func (n *Node) OnStart() error {
  151. n.BaseService.OnStart()
  152. // Create & add listener
  153. protocol, address := ProtocolAndAddress(n.config.GetString("node_laddr"))
  154. l := p2p.NewDefaultListener(protocol, address, n.config.GetBool("skip_upnp"))
  155. n.sw.AddListener(l)
  156. // Start the switch
  157. n.sw.SetNodeInfo(makeNodeInfo(n.config, n.sw, n.privKey))
  158. n.sw.SetNodePrivKey(n.privKey)
  159. _, err := n.sw.Start()
  160. if err != nil {
  161. return err
  162. }
  163. // Dial out of seed nodes exist
  164. if n.config.GetString("seeds") != "" {
  165. seeds := strings.Split(n.config.GetString("seeds"), ",")
  166. n.sw.DialSeeds(seeds)
  167. }
  168. // Run the RPC server
  169. if n.config.GetString("rpc_laddr") != "" {
  170. _, err := n.startRPC()
  171. if err != nil {
  172. return err
  173. }
  174. }
  175. return nil
  176. }
  177. func (n *Node) RunForever() {
  178. // Sleep forever and then...
  179. cmn.TrapSignal(func() {
  180. n.Stop()
  181. })
  182. }
  183. func (n *Node) OnStop() {
  184. n.BaseService.OnStop()
  185. log.Notice("Stopping Node")
  186. // TODO: gracefully disconnect from peers.
  187. n.sw.Stop()
  188. }
  189. // Add the event switch to reactors, mempool, etc.
  190. func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
  191. for _, e := range eventables {
  192. e.SetEventSwitch(evsw)
  193. }
  194. }
  195. // Add a Listener to accept inbound peer connections.
  196. // Add listeners before starting the Node.
  197. // The first listener is the primary listener (in NodeInfo)
  198. func (n *Node) AddListener(l p2p.Listener) {
  199. n.sw.AddListener(l)
  200. }
  201. func (n *Node) startRPC() ([]net.Listener, error) {
  202. rpccore.SetConfig(n.config)
  203. rpccore.SetEventSwitch(n.evsw)
  204. rpccore.SetBlockStore(n.blockStore)
  205. rpccore.SetConsensusState(n.consensusState)
  206. rpccore.SetMempool(n.mempoolReactor.Mempool)
  207. rpccore.SetSwitch(n.sw)
  208. rpccore.SetPubKey(n.privValidator.PubKey)
  209. rpccore.SetGenesisDoc(n.genesisDoc)
  210. rpccore.SetProxyAppQuery(n.proxyApp.Query())
  211. listenAddrs := strings.Split(n.config.GetString("rpc_laddr"), ",")
  212. // we may expose the rpc over both a unix and tcp socket
  213. listeners := make([]net.Listener, len(listenAddrs))
  214. for i, listenAddr := range listenAddrs {
  215. mux := http.NewServeMux()
  216. wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
  217. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  218. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes)
  219. listener, err := rpcserver.StartHTTPServer(listenAddr, mux)
  220. if err != nil {
  221. return nil, err
  222. }
  223. listeners[i] = listener
  224. }
  225. // we expose a simplified api over grpc for convenience to app devs
  226. grpcListenAddr := n.config.GetString("grpc_laddr")
  227. if grpcListenAddr != "" {
  228. listener, err := grpccore.StartGRPCServer(grpcListenAddr)
  229. if err != nil {
  230. return nil, err
  231. }
  232. listeners = append(listeners, listener)
  233. }
  234. return listeners, nil
  235. }
  236. func (n *Node) Switch() *p2p.Switch {
  237. return n.sw
  238. }
  239. func (n *Node) BlockStore() *bc.BlockStore {
  240. return n.blockStore
  241. }
  242. func (n *Node) ConsensusState() *consensus.ConsensusState {
  243. return n.consensusState
  244. }
  245. func (n *Node) ConsensusReactor() *consensus.ConsensusReactor {
  246. return n.consensusReactor
  247. }
  248. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  249. return n.mempoolReactor
  250. }
  251. func (n *Node) EventSwitch() types.EventSwitch {
  252. return n.evsw
  253. }
  254. // XXX: for convenience
  255. func (n *Node) PrivValidator() *types.PrivValidator {
  256. return n.privValidator
  257. }
  258. func (n *Node) GenesisDoc() *types.GenesisDoc {
  259. return n.genesisDoc
  260. }
  261. func (n *Node) ProxyApp() proxy.AppConns {
  262. return n.proxyApp
  263. }
  264. func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo {
  265. nodeInfo := &p2p.NodeInfo{
  266. PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
  267. Moniker: config.GetString("moniker"),
  268. Network: config.GetString("chain_id"),
  269. Version: version.Version,
  270. Other: []string{
  271. cmn.Fmt("wire_version=%v", wire.Version),
  272. cmn.Fmt("p2p_version=%v", p2p.Version),
  273. cmn.Fmt("consensus_version=%v", consensus.Version),
  274. cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
  275. },
  276. }
  277. // include git hash in the nodeInfo if available
  278. if rev, err := cmn.ReadFile(config.GetString("revision_file")); err == nil {
  279. nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("revision=%v", string(rev)))
  280. }
  281. if !sw.IsListening() {
  282. return nodeInfo
  283. }
  284. p2pListener := sw.Listeners()[0]
  285. p2pHost := p2pListener.ExternalAddress().IP.String()
  286. p2pPort := p2pListener.ExternalAddress().Port
  287. rpcListenAddr := config.GetString("rpc_laddr")
  288. // We assume that the rpcListener has the same ExternalAddress.
  289. // This is probably true because both P2P and RPC listeners use UPnP,
  290. // except of course if the rpc is only bound to localhost
  291. nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
  292. nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
  293. return nodeInfo
  294. }
  295. func (n *Node) NodeInfo() *p2p.NodeInfo {
  296. return n.sw.NodeInfo()
  297. }
  298. func (n *Node) DialSeeds(seeds []string) {
  299. n.sw.DialSeeds(seeds)
  300. }
  301. // Defaults to tcp
  302. func ProtocolAndAddress(listenAddr string) (string, string) {
  303. protocol, address := "tcp", listenAddr
  304. parts := strings.SplitN(address, "://", 2)
  305. if len(parts) == 2 {
  306. protocol, address = parts[0], parts[1]
  307. }
  308. return protocol, address
  309. }