You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

430 lines
12 KiB

10 years ago
10 years ago
11 years ago
11 years ago
8 years ago
8 years ago
8 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
9 years ago
10 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "errors"
  5. "net"
  6. "net/http"
  7. "strings"
  8. abci ""
  9. crypto ""
  10. wire ""
  11. bc ""
  12. cfg ""
  13. ""
  14. mempl ""
  15. p2p ""
  16. ""
  17. rpccore ""
  18. grpccore ""
  19. rpc ""
  20. rpcserver ""
  21. sm ""
  22. ""
  23. ""
  24. ""
  25. ""
  26. ""
  27. cmn ""
  28. dbm ""
  29. _ "net/http/pprof"
  30. )
  31. type Node struct {
  32. cmn.BaseService
  33. // config
  34. config *cfg.Config
  35. genesisDoc *types.GenesisDoc // initial validator set
  36. privValidator *types.PrivValidator // local node's validator key
  37. // network
  38. privKey crypto.PrivKeyEd25519 // local node's p2p key
  39. sw *p2p.Switch // p2p connections
  40. addrBook *p2p.AddrBook // known peers
  41. // services
  42. evsw types.EventSwitch // pub/sub for services
  43. blockStore *bc.BlockStore // store the blockchain to disk
  44. bcReactor *bc.BlockchainReactor // for fast-syncing
  45. mempoolReactor *mempl.MempoolReactor // for gossipping transactions
  46. consensusState *consensus.ConsensusState // latest consensus state
  47. consensusReactor *consensus.ConsensusReactor // for participating in the consensus
  48. proxyApp proxy.AppConns // connection to the application
  49. rpcListeners []net.Listener // rpc servers
  50. txIndexer txindex.TxIndexer
  51. }
  52. func NewNodeDefault(config *cfg.Config) *Node {
  53. // Get PrivValidator
  54. privValidator := types.LoadOrGenPrivValidator(config.PrivValidatorFile())
  55. return NewNode(config, privValidator,
  56. proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()))
  57. }
  58. func NewNode(config *cfg.Config, privValidator *types.PrivValidator, clientCreator proxy.ClientCreator) *Node {
  59. // Get BlockStore
  60. blockStoreDB := dbm.NewDB("blockstore", config.DBBackend, config.DBDir())
  61. blockStore := bc.NewBlockStore(blockStoreDB)
  62. // Get State
  63. stateDB := dbm.NewDB("state", config.DBBackend, config.DBDir())
  64. state := sm.GetState(stateDB, config.GenesisFile())
  65. // add the chainid and number of validators to the global config
  66. // TODO: Set ChainID. eg:
  67. // config.Consensus.SetChainID(state.ChainID) // ...
  68. // but actually consensus doesnt need it since the cs has the state ...
  69. // Create the proxyApp, which manages connections (consensus, mempool, query)
  70. // and sync tendermint and the app by replaying any necessary blocks
  71. proxyApp := proxy.NewAppConns(clientCreator, consensus.NewHandshaker(state, blockStore))
  72. if _, err := proxyApp.Start(); err != nil {
  73. cmn.Exit(cmn.Fmt("Error starting proxy app connections: %v", err))
  74. }
  75. // reload the state (it may have been updated by the handshake)
  76. state = sm.LoadState(stateDB)
  77. // Transaction indexing
  78. var txIndexer txindex.TxIndexer
  79. switch config.TxIndex {
  80. case "kv":
  81. store := dbm.NewDB("tx_index", config.DBBackend, config.DBDir())
  82. txIndexer = kv.NewTxIndex(store)
  83. default:
  84. txIndexer = &null.TxIndex{}
  85. }
  86. state.TxIndexer = txIndexer
  87. // Generate node PrivKey
  88. privKey := crypto.GenPrivKeyEd25519()
  89. // Make event switch
  90. eventSwitch := types.NewEventSwitch()
  91. _, err := eventSwitch.Start()
  92. if err != nil {
  93. cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
  94. }
  95. // Decide whether to fast-sync or not
  96. // We don't fast-sync when the only validator is us.
  97. fastSync := config.FastSync
  98. if state.Validators.Size() == 1 {
  99. addr, _ := state.Validators.GetByIndex(0)
  100. if bytes.Equal(privValidator.Address, addr) {
  101. fastSync = false
  102. }
  103. }
  104. // Make BlockchainReactor
  105. bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync)
  106. // Make MempoolReactor
  107. mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool())
  108. mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
  109. // Make ConsensusReactor
  110. consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
  111. if privValidator != nil {
  112. consensusState.SetPrivValidator(privValidator)
  113. }
  114. consensusReactor := consensus.NewConsensusReactor(consensusState, fastSync)
  115. sw := p2p.NewSwitch(config.P2P)
  116. sw.AddReactor("MEMPOOL", mempoolReactor)
  117. sw.AddReactor("BLOCKCHAIN", bcReactor)
  118. sw.AddReactor("CONSENSUS", consensusReactor)
  119. // Optionally, start the pex reactor
  120. var addrBook *p2p.AddrBook
  121. if config.P2P.PexReactor {
  122. addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
  123. pexReactor := p2p.NewPEXReactor(addrBook)
  124. sw.AddReactor("PEX", pexReactor)
  125. }
  126. // Filter peers by addr or pubkey with an ABCI query.
  127. // If the query return code is OK, add peer.
  128. // XXX: Query format subject to change
  129. if config.FilterPeers {
  130. // NOTE: addr is ip:port
  131. sw.SetAddrFilter(func(addr net.Addr) error {
  132. resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/addr/%s", addr.String())})
  133. if err != nil {
  134. return err
  135. }
  136. if resQuery.Code.IsOK() {
  137. return nil
  138. }
  139. return errors.New(resQuery.Code.String())
  140. })
  141. sw.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
  142. resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/pubkey/%X", pubkey.Bytes())})
  143. if err != nil {
  144. return err
  145. }
  146. if resQuery.Code.IsOK() {
  147. return nil
  148. }
  149. return errors.New(resQuery.Code.String())
  150. })
  151. }
  152. // add the event switch to all services
  153. // they should all satisfy events.Eventable
  154. SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  155. // run the profile server
  156. profileHost := config.ProfListenAddress
  157. if profileHost != "" {
  158. go func() {
  159. log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil))
  160. }()
  161. }
  162. node := &Node{
  163. config: config,
  164. genesisDoc: state.GenesisDoc,
  165. privValidator: privValidator,
  166. privKey: privKey,
  167. sw: sw,
  168. addrBook: addrBook,
  169. evsw: eventSwitch,
  170. blockStore: blockStore,
  171. bcReactor: bcReactor,
  172. mempoolReactor: mempoolReactor,
  173. consensusState: consensusState,
  174. consensusReactor: consensusReactor,
  175. proxyApp: proxyApp,
  176. txIndexer: txIndexer,
  177. }
  178. node.BaseService = *cmn.NewBaseService(log, "Node", node)
  179. return node
  180. }
  181. func (n *Node) OnStart() error {
  182. // Create & add listener
  183. protocol, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
  184. l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP)
  185. n.sw.AddListener(l)
  186. // Start the switch
  187. n.sw.SetNodeInfo(n.makeNodeInfo())
  188. n.sw.SetNodePrivKey(n.privKey)
  189. _, err := n.sw.Start()
  190. if err != nil {
  191. return err
  192. }
  193. // If seeds exist, add them to the address book and dial out
  194. if n.config.P2P.Seeds != "" {
  195. // dial out
  196. seeds := strings.Split(n.config.P2P.Seeds, ",")
  197. if err := n.DialSeeds(seeds); err != nil {
  198. return err
  199. }
  200. }
  201. // Run the RPC server
  202. if n.config.RPCListenAddress != "" {
  203. listeners, err := n.startRPC()
  204. if err != nil {
  205. return err
  206. }
  207. n.rpcListeners = listeners
  208. }
  209. return nil
  210. }
  211. func (n *Node) OnStop() {
  212. n.BaseService.OnStop()
  213. log.Notice("Stopping Node")
  214. // TODO: gracefully disconnect from peers.
  215. n.sw.Stop()
  216. for _, l := range n.rpcListeners {
  217. log.Info("Closing rpc listener", "listener", l)
  218. if err := l.Close(); err != nil {
  219. log.Error("Error closing listener", "listener", l, "error", err)
  220. }
  221. }
  222. }
  223. func (n *Node) RunForever() {
  224. // Sleep forever and then...
  225. cmn.TrapSignal(func() {
  226. n.Stop()
  227. })
  228. }
  229. // Add the event switch to reactors, mempool, etc.
  230. func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
  231. for _, e := range eventables {
  232. e.SetEventSwitch(evsw)
  233. }
  234. }
  235. // Add a Listener to accept inbound peer connections.
  236. // Add listeners before starting the Node.
  237. // The first listener is the primary listener (in NodeInfo)
  238. func (n *Node) AddListener(l p2p.Listener) {
  239. n.sw.AddListener(l)
  240. }
  241. // ConfigureRPC sets all variables in rpccore so they will serve
  242. // rpc calls from this node
  243. func (n *Node) ConfigureRPC() {
  244. rpccore.SetEventSwitch(n.evsw)
  245. rpccore.SetBlockStore(n.blockStore)
  246. rpccore.SetConsensusState(n.consensusState)
  247. rpccore.SetMempool(n.mempoolReactor.Mempool)
  248. rpccore.SetSwitch(n.sw)
  249. rpccore.SetPubKey(n.privValidator.PubKey)
  250. rpccore.SetGenesisDoc(n.genesisDoc)
  251. rpccore.SetAddrBook(n.addrBook)
  252. rpccore.SetProxyAppQuery(n.proxyApp.Query())
  253. rpccore.SetTxIndexer(n.txIndexer)
  254. }
  255. func (n *Node) startRPC() ([]net.Listener, error) {
  256. n.ConfigureRPC()
  257. listenAddrs := strings.Split(n.config.RPCListenAddress, ",")
  258. // we may expose the rpc over both a unix and tcp socket
  259. listeners := make([]net.Listener, len(listenAddrs))
  260. for i, listenAddr := range listenAddrs {
  261. mux := http.NewServeMux()
  262. wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
  263. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  264. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes)
  265. listener, err := rpcserver.StartHTTPServer(listenAddr, mux)
  266. if err != nil {
  267. return nil, err
  268. }
  269. listeners[i] = listener
  270. }
  271. // we expose a simplified api over grpc for convenience to app devs
  272. grpcListenAddr := n.config.GRPCListenAddress
  273. if grpcListenAddr != "" {
  274. listener, err := grpccore.StartGRPCServer(grpcListenAddr)
  275. if err != nil {
  276. return nil, err
  277. }
  278. listeners = append(listeners, listener)
  279. }
  280. return listeners, nil
  281. }
  282. func (n *Node) Switch() *p2p.Switch {
  283. return n.sw
  284. }
  285. func (n *Node) BlockStore() *bc.BlockStore {
  286. return n.blockStore
  287. }
  288. func (n *Node) ConsensusState() *consensus.ConsensusState {
  289. return n.consensusState
  290. }
  291. func (n *Node) ConsensusReactor() *consensus.ConsensusReactor {
  292. return n.consensusReactor
  293. }
  294. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  295. return n.mempoolReactor
  296. }
  297. func (n *Node) EventSwitch() types.EventSwitch {
  298. return n.evsw
  299. }
  300. // XXX: for convenience
  301. func (n *Node) PrivValidator() *types.PrivValidator {
  302. return n.privValidator
  303. }
  304. func (n *Node) GenesisDoc() *types.GenesisDoc {
  305. return n.genesisDoc
  306. }
  307. func (n *Node) ProxyApp() proxy.AppConns {
  308. return n.proxyApp
  309. }
  310. func (n *Node) makeNodeInfo() *p2p.NodeInfo {
  311. txIndexerStatus := "on"
  312. if _, ok := n.txIndexer.(*null.TxIndex); ok {
  313. txIndexerStatus = "off"
  314. }
  315. nodeInfo := &p2p.NodeInfo{
  316. PubKey: n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
  317. Moniker: n.config.Moniker,
  318. Network: n.consensusState.GetState().ChainID,
  319. Version: version.Version,
  320. Other: []string{
  321. cmn.Fmt("wire_version=%v", wire.Version),
  322. cmn.Fmt("p2p_version=%v", p2p.Version),
  323. cmn.Fmt("consensus_version=%v", consensus.Version),
  324. cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
  325. cmn.Fmt("tx_index=%v", txIndexerStatus),
  326. },
  327. }
  328. // include git hash in the nodeInfo if available
  329. // TODO: use ld-flags
  330. /*if rev, err := cmn.ReadFile(n.config.GetString("revision_file")); err == nil {
  331. nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("revision=%v", string(rev)))
  332. }*/
  333. if !n.sw.IsListening() {
  334. return nodeInfo
  335. }
  336. p2pListener := n.sw.Listeners()[0]
  337. p2pHost := p2pListener.ExternalAddress().IP.String()
  338. p2pPort := p2pListener.ExternalAddress().Port
  339. rpcListenAddr := n.config.RPCListenAddress
  340. // We assume that the rpcListener has the same ExternalAddress.
  341. // This is probably true because both P2P and RPC listeners use UPnP,
  342. // except of course if the rpc is only bound to localhost
  343. nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
  344. nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
  345. return nodeInfo
  346. }
  347. //------------------------------------------------------------------------------
  348. func (n *Node) NodeInfo() *p2p.NodeInfo {
  349. return n.sw.NodeInfo()
  350. }
  351. func (n *Node) DialSeeds(seeds []string) error {
  352. return n.sw.DialSeeds(n.addrBook, seeds)
  353. }
  354. // Defaults to tcp
  355. func ProtocolAndAddress(listenAddr string) (string, string) {
  356. protocol, address := "tcp", listenAddr
  357. parts := strings.SplitN(address, "://", 2)
  358. if len(parts) == 2 {
  359. protocol, address = parts[0], parts[1]
  360. }
  361. return protocol, address
  362. }
  363. //------------------------------------------------------------------------------