You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

458 lines
14 KiB

10 years ago
8 years ago
10 years ago
11 years ago
11 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
11 years ago
8 years ago
11 years ago
8 years ago
11 years ago
8 years ago
8 years ago
8 years ago
11 years ago
11 years ago
8 years ago
10 years ago
8 years ago
8 years ago
9 years ago
10 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "errors"
  5. "net"
  6. "net/http"
  7. "strings"
  8. abci "github.com/tendermint/abci/types"
  9. crypto "github.com/tendermint/go-crypto"
  10. wire "github.com/tendermint/go-wire"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. cfg "github.com/tendermint/tendermint/config"
  13. "github.com/tendermint/tendermint/consensus"
  14. mempl "github.com/tendermint/tendermint/mempool"
  15. p2p "github.com/tendermint/tendermint/p2p"
  16. "github.com/tendermint/tendermint/proxy"
  17. rpccore "github.com/tendermint/tendermint/rpc/core"
  18. grpccore "github.com/tendermint/tendermint/rpc/grpc"
  19. rpc "github.com/tendermint/tendermint/rpc/lib"
  20. rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
  21. sm "github.com/tendermint/tendermint/state"
  22. "github.com/tendermint/tendermint/state/txindex"
  23. "github.com/tendermint/tendermint/state/txindex/kv"
  24. "github.com/tendermint/tendermint/state/txindex/null"
  25. "github.com/tendermint/tendermint/types"
  26. "github.com/tendermint/tendermint/version"
  27. cmn "github.com/tendermint/tmlibs/common"
  28. dbm "github.com/tendermint/tmlibs/db"
  29. "github.com/tendermint/tmlibs/log"
  30. _ "net/http/pprof"
  31. )
  32. type Node struct {
  33. cmn.BaseService
  34. // config
  35. config *cfg.Config
  36. genesisDoc *types.GenesisDoc // initial validator set
  37. privValidator *types.PrivValidator // local node's validator key
  38. // network
  39. privKey crypto.PrivKeyEd25519 // local node's p2p key
  40. sw *p2p.Switch // p2p connections
  41. addrBook *p2p.AddrBook // known peers
  42. // services
  43. evsw types.EventSwitch // pub/sub for services
  44. blockStore *bc.BlockStore // store the blockchain to disk
  45. bcReactor *bc.BlockchainReactor // for fast-syncing
  46. mempoolReactor *mempl.MempoolReactor // for gossipping transactions
  47. consensusState *consensus.ConsensusState // latest consensus state
  48. consensusReactor *consensus.ConsensusReactor // for participating in the consensus
  49. proxyApp proxy.AppConns // connection to the application
  50. rpcListeners []net.Listener // rpc servers
  51. txIndexer txindex.TxIndexer
  52. }
  53. func NewNodeDefault(config *cfg.Config, logger log.Logger) *Node {
  54. // Get PrivValidator
  55. privValidator := types.LoadOrGenPrivValidator(config.PrivValidatorFile(), logger)
  56. return NewNode(config, privValidator,
  57. proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), logger)
  58. }
  59. func NewNode(config *cfg.Config, privValidator *types.PrivValidator, clientCreator proxy.ClientCreator, logger log.Logger) *Node {
  60. // Get BlockStore
  61. blockStoreDB := dbm.NewDB("blockstore", config.DBBackend, config.DBDir())
  62. blockStore := bc.NewBlockStore(blockStoreDB)
  63. consensusLogger := logger.With("module", "consensus")
  64. stateLogger := logger.With("module", "state")
  65. // Get State
  66. stateDB := dbm.NewDB("state", config.DBBackend, config.DBDir())
  67. state := sm.GetState(stateDB, config.GenesisFile())
  68. state.SetLogger(stateLogger)
  69. // Create the proxyApp, which manages connections (consensus, mempool, query)
  70. // and sync tendermint and the app by replaying any necessary blocks
  71. handshaker := consensus.NewHandshaker(state, blockStore)
  72. handshaker.SetLogger(consensusLogger)
  73. proxyApp := proxy.NewAppConns(clientCreator, handshaker)
  74. proxyApp.SetLogger(logger.With("module", "proxy"))
  75. if _, err := proxyApp.Start(); err != nil {
  76. cmn.Exit(cmn.Fmt("Error starting proxy app connections: %v", err))
  77. }
  78. // reload the state (it may have been updated by the handshake)
  79. state = sm.LoadState(stateDB)
  80. state.SetLogger(stateLogger)
  81. // Transaction indexing
  82. var txIndexer txindex.TxIndexer
  83. switch config.TxIndex {
  84. case "kv":
  85. store := dbm.NewDB("tx_index", config.DBBackend, config.DBDir())
  86. txIndexer = kv.NewTxIndex(store)
  87. default:
  88. txIndexer = &null.TxIndex{}
  89. }
  90. state.TxIndexer = txIndexer
  91. // Generate node PrivKey
  92. privKey := crypto.GenPrivKeyEd25519()
  93. // Make event switch
  94. eventSwitch := types.NewEventSwitch()
  95. eventSwitch.SetLogger(logger.With("module", "types"))
  96. _, err := eventSwitch.Start()
  97. if err != nil {
  98. cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
  99. }
  100. // Decide whether to fast-sync or not
  101. // We don't fast-sync when the only validator is us.
  102. fastSync := config.FastSync
  103. if state.Validators.Size() == 1 {
  104. addr, _ := state.Validators.GetByIndex(0)
  105. if bytes.Equal(privValidator.Address, addr) {
  106. fastSync = false
  107. }
  108. }
  109. // Log whether this node is a validator or an observer
  110. if state.Validators.HasAddress(privValidator.Address) {
  111. consensusLogger.Info("This node is a validator")
  112. } else {
  113. consensusLogger.Info("This node is not a validator")
  114. }
  115. // Make BlockchainReactor
  116. bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync)
  117. bcReactor.SetLogger(logger.With("module", "blockchain"))
  118. // Make MempoolReactor
  119. mempoolLogger := logger.With("module", "mempool")
  120. mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool())
  121. mempool.SetLogger(mempoolLogger)
  122. mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
  123. mempoolReactor.SetLogger(mempoolLogger)
  124. // Make ConsensusReactor
  125. consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
  126. consensusState.SetLogger(consensusLogger)
  127. if privValidator != nil {
  128. consensusState.SetPrivValidator(privValidator)
  129. }
  130. consensusReactor := consensus.NewConsensusReactor(consensusState, fastSync)
  131. consensusReactor.SetLogger(consensusLogger)
  132. p2pLogger := logger.With("module", "p2p")
  133. sw := p2p.NewSwitch(config.P2P)
  134. sw.SetLogger(p2pLogger)
  135. sw.AddReactor("MEMPOOL", mempoolReactor)
  136. sw.AddReactor("BLOCKCHAIN", bcReactor)
  137. sw.AddReactor("CONSENSUS", consensusReactor)
  138. // Optionally, start the pex reactor
  139. var addrBook *p2p.AddrBook
  140. if config.P2P.PexReactor {
  141. addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
  142. addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
  143. pexReactor := p2p.NewPEXReactor(addrBook)
  144. pexReactor.SetLogger(p2pLogger)
  145. sw.AddReactor("PEX", pexReactor)
  146. }
  147. // Filter peers by addr or pubkey with an ABCI query.
  148. // If the query return code is OK, add peer.
  149. // XXX: Query format subject to change
  150. if config.FilterPeers {
  151. // NOTE: addr is ip:port
  152. sw.SetAddrFilter(func(addr net.Addr) error {
  153. resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/addr/%s", addr.String())})
  154. if err != nil {
  155. return err
  156. }
  157. if resQuery.Code.IsOK() {
  158. return nil
  159. }
  160. return errors.New(resQuery.Code.String())
  161. })
  162. sw.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
  163. resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/pubkey/%X", pubkey.Bytes())})
  164. if err != nil {
  165. return err
  166. }
  167. if resQuery.Code.IsOK() {
  168. return nil
  169. }
  170. return errors.New(resQuery.Code.String())
  171. })
  172. }
  173. // add the event switch to all services
  174. // they should all satisfy events.Eventable
  175. SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  176. // run the profile server
  177. profileHost := config.ProfListenAddress
  178. if profileHost != "" {
  179. go func() {
  180. logger.Error("Profile server", "error", http.ListenAndServe(profileHost, nil))
  181. }()
  182. }
  183. node := &Node{
  184. config: config,
  185. genesisDoc: state.GenesisDoc,
  186. privValidator: privValidator,
  187. privKey: privKey,
  188. sw: sw,
  189. addrBook: addrBook,
  190. evsw: eventSwitch,
  191. blockStore: blockStore,
  192. bcReactor: bcReactor,
  193. mempoolReactor: mempoolReactor,
  194. consensusState: consensusState,
  195. consensusReactor: consensusReactor,
  196. proxyApp: proxyApp,
  197. txIndexer: txIndexer,
  198. }
  199. node.BaseService = *cmn.NewBaseService(logger, "Node", node)
  200. return node
  201. }
  202. func (n *Node) OnStart() error {
  203. // Create & add listener
  204. protocol, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
  205. l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, n.Logger.With("module", "p2p"))
  206. n.sw.AddListener(l)
  207. // Start the switch
  208. n.sw.SetNodeInfo(n.makeNodeInfo())
  209. n.sw.SetNodePrivKey(n.privKey)
  210. _, err := n.sw.Start()
  211. if err != nil {
  212. return err
  213. }
  214. // If seeds exist, add them to the address book and dial out
  215. if n.config.P2P.Seeds != "" {
  216. // dial out
  217. seeds := strings.Split(n.config.P2P.Seeds, ",")
  218. if err := n.DialSeeds(seeds); err != nil {
  219. return err
  220. }
  221. }
  222. // Run the RPC server
  223. if n.config.RPC.ListenAddress != "" {
  224. listeners, err := n.startRPC()
  225. if err != nil {
  226. return err
  227. }
  228. n.rpcListeners = listeners
  229. }
  230. return nil
  231. }
  232. func (n *Node) OnStop() {
  233. n.BaseService.OnStop()
  234. n.Logger.Info("Stopping Node")
  235. // TODO: gracefully disconnect from peers.
  236. n.sw.Stop()
  237. for _, l := range n.rpcListeners {
  238. n.Logger.Info("Closing rpc listener", "listener", l)
  239. if err := l.Close(); err != nil {
  240. n.Logger.Error("Error closing listener", "listener", l, "error", err)
  241. }
  242. }
  243. }
  244. func (n *Node) RunForever() {
  245. // Sleep forever and then...
  246. cmn.TrapSignal(func() {
  247. n.Stop()
  248. })
  249. }
  250. // Add the event switch to reactors, mempool, etc.
  251. func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
  252. for _, e := range eventables {
  253. e.SetEventSwitch(evsw)
  254. }
  255. }
  256. // Add a Listener to accept inbound peer connections.
  257. // Add listeners before starting the Node.
  258. // The first listener is the primary listener (in NodeInfo)
  259. func (n *Node) AddListener(l p2p.Listener) {
  260. n.sw.AddListener(l)
  261. }
  262. // ConfigureRPC sets all variables in rpccore so they will serve
  263. // rpc calls from this node
  264. func (n *Node) ConfigureRPC() {
  265. rpccore.SetEventSwitch(n.evsw)
  266. rpccore.SetBlockStore(n.blockStore)
  267. rpccore.SetConsensusState(n.consensusState)
  268. rpccore.SetMempool(n.mempoolReactor.Mempool)
  269. rpccore.SetSwitch(n.sw)
  270. rpccore.SetPubKey(n.privValidator.PubKey)
  271. rpccore.SetGenesisDoc(n.genesisDoc)
  272. rpccore.SetAddrBook(n.addrBook)
  273. rpccore.SetProxyAppQuery(n.proxyApp.Query())
  274. rpccore.SetTxIndexer(n.txIndexer)
  275. rpccore.SetLogger(n.Logger.With("module", "rpc"))
  276. }
  277. func (n *Node) startRPC() ([]net.Listener, error) {
  278. n.ConfigureRPC()
  279. listenAddrs := strings.Split(n.config.RPC.ListenAddress, ",")
  280. if n.config.RPC.Unsafe {
  281. rpccore.AddUnsafeRoutes()
  282. }
  283. // we may expose the rpc over both a unix and tcp socket
  284. listeners := make([]net.Listener, len(listenAddrs))
  285. for i, listenAddr := range listenAddrs {
  286. mux := http.NewServeMux()
  287. wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
  288. rpcLogger := n.Logger.With("module", "rpc-server")
  289. wm.SetLogger(rpcLogger)
  290. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  291. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
  292. listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger)
  293. if err != nil {
  294. return nil, err
  295. }
  296. listeners[i] = listener
  297. }
  298. // we expose a simplified api over grpc for convenience to app devs
  299. grpcListenAddr := n.config.RPC.GRPCListenAddress
  300. if grpcListenAddr != "" {
  301. listener, err := grpccore.StartGRPCServer(grpcListenAddr)
  302. if err != nil {
  303. return nil, err
  304. }
  305. listeners = append(listeners, listener)
  306. }
  307. return listeners, nil
  308. }
  309. func (n *Node) Switch() *p2p.Switch {
  310. return n.sw
  311. }
  312. func (n *Node) BlockStore() *bc.BlockStore {
  313. return n.blockStore
  314. }
  315. func (n *Node) ConsensusState() *consensus.ConsensusState {
  316. return n.consensusState
  317. }
  318. func (n *Node) ConsensusReactor() *consensus.ConsensusReactor {
  319. return n.consensusReactor
  320. }
  321. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  322. return n.mempoolReactor
  323. }
  324. func (n *Node) EventSwitch() types.EventSwitch {
  325. return n.evsw
  326. }
  327. // XXX: for convenience
  328. func (n *Node) PrivValidator() *types.PrivValidator {
  329. return n.privValidator
  330. }
  331. func (n *Node) GenesisDoc() *types.GenesisDoc {
  332. return n.genesisDoc
  333. }
  334. func (n *Node) ProxyApp() proxy.AppConns {
  335. return n.proxyApp
  336. }
  337. func (n *Node) makeNodeInfo() *p2p.NodeInfo {
  338. txIndexerStatus := "on"
  339. if _, ok := n.txIndexer.(*null.TxIndex); ok {
  340. txIndexerStatus = "off"
  341. }
  342. nodeInfo := &p2p.NodeInfo{
  343. PubKey: n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
  344. Moniker: n.config.Moniker,
  345. Network: n.consensusState.GetState().ChainID,
  346. Version: version.Version,
  347. Other: []string{
  348. cmn.Fmt("wire_version=%v", wire.Version),
  349. cmn.Fmt("p2p_version=%v", p2p.Version),
  350. cmn.Fmt("consensus_version=%v", consensus.Version),
  351. cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
  352. cmn.Fmt("tx_index=%v", txIndexerStatus),
  353. },
  354. }
  355. // include git hash in the nodeInfo if available
  356. // TODO: use ld-flags
  357. /*if rev, err := cmn.ReadFile(n.config.GetString("revision_file")); err == nil {
  358. nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("revision=%v", string(rev)))
  359. }*/
  360. if !n.sw.IsListening() {
  361. return nodeInfo
  362. }
  363. p2pListener := n.sw.Listeners()[0]
  364. p2pHost := p2pListener.ExternalAddress().IP.String()
  365. p2pPort := p2pListener.ExternalAddress().Port
  366. rpcListenAddr := n.config.RPC.ListenAddress
  367. // We assume that the rpcListener has the same ExternalAddress.
  368. // This is probably true because both P2P and RPC listeners use UPnP,
  369. // except of course if the rpc is only bound to localhost
  370. nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
  371. nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
  372. return nodeInfo
  373. }
  374. //------------------------------------------------------------------------------
  375. func (n *Node) NodeInfo() *p2p.NodeInfo {
  376. return n.sw.NodeInfo()
  377. }
  378. func (n *Node) DialSeeds(seeds []string) error {
  379. return n.sw.DialSeeds(n.addrBook, seeds)
  380. }
  381. // Defaults to tcp
  382. func ProtocolAndAddress(listenAddr string) (string, string) {
  383. protocol, address := "tcp", listenAddr
  384. parts := strings.SplitN(address, "://", 2)
  385. if len(parts) == 2 {
  386. protocol, address = parts[0], parts[1]
  387. }
  388. return protocol, address
  389. }
  390. //------------------------------------------------------------------------------