You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

538 lines
16 KiB

10 years ago
10 years ago
11 years ago
11 years ago
8 years ago
8 years ago
11 years ago
8 years ago
11 years ago
8 years ago
11 years ago
8 years ago
8 years ago
11 years ago
11 years ago
8 years ago
10 years ago
8 years ago
9 years ago
10 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "net/http"
  8. "strings"
  9. abci "github.com/tendermint/abci/types"
  10. crypto "github.com/tendermint/go-crypto"
  11. wire "github.com/tendermint/go-wire"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. dbm "github.com/tendermint/tmlibs/db"
  14. "github.com/tendermint/tmlibs/log"
  15. bc "github.com/tendermint/tendermint/blockchain"
  16. cfg "github.com/tendermint/tendermint/config"
  17. "github.com/tendermint/tendermint/consensus"
  18. mempl "github.com/tendermint/tendermint/mempool"
  19. "github.com/tendermint/tendermint/p2p"
  20. "github.com/tendermint/tendermint/proxy"
  21. rpccore "github.com/tendermint/tendermint/rpc/core"
  22. grpccore "github.com/tendermint/tendermint/rpc/grpc"
  23. rpc "github.com/tendermint/tendermint/rpc/lib"
  24. rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
  25. sm "github.com/tendermint/tendermint/state"
  26. "github.com/tendermint/tendermint/state/txindex"
  27. "github.com/tendermint/tendermint/state/txindex/kv"
  28. "github.com/tendermint/tendermint/state/txindex/null"
  29. "github.com/tendermint/tendermint/types"
  30. "github.com/tendermint/tendermint/version"
  31. _ "net/http/pprof"
  32. )
  33. //------------------------------------------------------------------------------
  34. // DBContext specifies config information for loading a new DB.
  35. type DBContext struct {
  36. ID string
  37. Config *cfg.Config
  38. }
  39. // DBProvider takes a DBContext and returns an instantiated DB.
  40. type DBProvider func(*DBContext) (dbm.DB, error)
  41. // DefaultDBProvider returns a database using the DBBackend and DBDir
  42. // specified in the ctx.Config.
  43. func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
  44. return dbm.NewDB(ctx.ID, ctx.Config.DBBackend, ctx.Config.DBDir()), nil
  45. }
  46. // GenesisDocProvider returns a GenesisDoc.
  47. // It allows the GenesisDoc to be pulled from sources other than the
  48. // filesystem, for instance from a distributed key-value store cluster.
  49. type GenesisDocProvider func() (*types.GenesisDoc, error)
  50. // DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads
  51. // the GenesisDoc from the config.GenesisFile() on the filesystem.
  52. func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider {
  53. return func() (*types.GenesisDoc, error) {
  54. return types.GenesisDocFromFile(config.GenesisFile())
  55. }
  56. }
  57. // NodeProvider takes a config and a logger and returns a ready to go Node.
  58. type NodeProvider func(*cfg.Config, log.Logger) (*Node, error)
  59. // DefaultNewNode returns a Tendermint node with default settings for the
  60. // PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
  61. // It implements NodeProvider.
  62. func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
  63. return NewNode(config,
  64. types.LoadOrGenPrivValidatorFS(config.PrivValidatorFile()),
  65. proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
  66. DefaultGenesisDocProviderFunc(config),
  67. DefaultDBProvider,
  68. logger)
  69. }
  70. //------------------------------------------------------------------------------
  71. // Node is the highest level interface to a full Tendermint node.
  72. // It includes all configuration information and running services.
  73. type Node struct {
  74. cmn.BaseService
  75. // config
  76. config *cfg.Config
  77. genesisDoc *types.GenesisDoc // initial validator set
  78. privValidator types.PrivValidator // local node's validator key
  79. // network
  80. privKey crypto.PrivKeyEd25519 // local node's p2p key
  81. sw *p2p.Switch // p2p connections
  82. addrBook *p2p.AddrBook // known peers
  83. // services
  84. evsw types.EventSwitch // pub/sub for services
  85. blockStore *bc.BlockStore // store the blockchain to disk
  86. bcReactor *bc.BlockchainReactor // for fast-syncing
  87. mempoolReactor *mempl.MempoolReactor // for gossipping transactions
  88. consensusState *consensus.ConsensusState // latest consensus state
  89. consensusReactor *consensus.ConsensusReactor // for participating in the consensus
  90. proxyApp proxy.AppConns // connection to the application
  91. rpcListeners []net.Listener // rpc servers
  92. txIndexer txindex.TxIndexer
  93. }
  94. // NewNode returns a new, ready to go, Tendermint Node.
  95. func NewNode(config *cfg.Config,
  96. privValidator types.PrivValidator,
  97. clientCreator proxy.ClientCreator,
  98. genesisDocProvider GenesisDocProvider,
  99. dbProvider DBProvider,
  100. logger log.Logger) (*Node, error) {
  101. // Get BlockStore
  102. blockStoreDB, err := dbProvider(&DBContext{"blockstore", config})
  103. if err != nil {
  104. return nil, err
  105. }
  106. blockStore := bc.NewBlockStore(blockStoreDB)
  107. consensusLogger := logger.With("module", "consensus")
  108. stateLogger := logger.With("module", "state")
  109. // Get State
  110. stateDB, err := dbProvider(&DBContext{"state", config})
  111. if err != nil {
  112. return nil, err
  113. }
  114. state := sm.LoadState(stateDB)
  115. if state == nil {
  116. genDoc, err := genesisDocProvider()
  117. if err != nil {
  118. return nil, err
  119. }
  120. state, err = sm.MakeGenesisState(stateDB, genDoc)
  121. if err != nil {
  122. return nil, err
  123. }
  124. state.Save()
  125. }
  126. state.SetLogger(stateLogger)
  127. // Create the proxyApp, which manages connections (consensus, mempool, query)
  128. // and sync tendermint and the app by replaying any necessary blocks
  129. handshaker := consensus.NewHandshaker(state, blockStore)
  130. handshaker.SetLogger(consensusLogger)
  131. proxyApp := proxy.NewAppConns(clientCreator, handshaker)
  132. proxyApp.SetLogger(logger.With("module", "proxy"))
  133. if _, err := proxyApp.Start(); err != nil {
  134. return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
  135. }
  136. // reload the state (it may have been updated by the handshake)
  137. state = sm.LoadState(stateDB)
  138. state.SetLogger(stateLogger)
  139. // Transaction indexing
  140. var txIndexer txindex.TxIndexer
  141. switch config.TxIndex {
  142. case "kv":
  143. store, err := dbProvider(&DBContext{"tx_index", config})
  144. if err != nil {
  145. return nil, err
  146. }
  147. txIndexer = kv.NewTxIndex(store)
  148. default:
  149. txIndexer = &null.TxIndex{}
  150. }
  151. state.TxIndexer = txIndexer
  152. // Generate node PrivKey
  153. privKey := crypto.GenPrivKeyEd25519()
  154. // Make event switch
  155. eventSwitch := types.NewEventSwitch()
  156. eventSwitch.SetLogger(logger.With("module", "types"))
  157. if _, err := eventSwitch.Start(); err != nil {
  158. return nil, fmt.Errorf("Failed to start switch: %v", err)
  159. }
  160. // Decide whether to fast-sync or not
  161. // We don't fast-sync when the only validator is us.
  162. fastSync := config.FastSync
  163. if state.Validators.Size() == 1 {
  164. addr, _ := state.Validators.GetByIndex(0)
  165. if bytes.Equal(privValidator.GetAddress(), addr) {
  166. fastSync = false
  167. }
  168. }
  169. // Log whether this node is a validator or an observer
  170. if state.Validators.HasAddress(privValidator.GetAddress()) {
  171. consensusLogger.Info("This node is a validator")
  172. } else {
  173. consensusLogger.Info("This node is not a validator")
  174. }
  175. // Make BlockchainReactor
  176. bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync)
  177. bcReactor.SetLogger(logger.With("module", "blockchain"))
  178. // Make MempoolReactor
  179. mempoolLogger := logger.With("module", "mempool")
  180. mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight)
  181. mempool.SetLogger(mempoolLogger)
  182. mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
  183. mempoolReactor.SetLogger(mempoolLogger)
  184. if config.Consensus.WaitForTxs() {
  185. mempool.EnableTxsAvailable()
  186. }
  187. // Make ConsensusReactor
  188. consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
  189. consensusState.SetLogger(consensusLogger)
  190. if privValidator != nil {
  191. consensusState.SetPrivValidator(privValidator)
  192. }
  193. consensusReactor := consensus.NewConsensusReactor(consensusState, fastSync)
  194. consensusReactor.SetLogger(consensusLogger)
  195. p2pLogger := logger.With("module", "p2p")
  196. sw := p2p.NewSwitch(config.P2P)
  197. sw.SetLogger(p2pLogger)
  198. sw.AddReactor("MEMPOOL", mempoolReactor)
  199. sw.AddReactor("BLOCKCHAIN", bcReactor)
  200. sw.AddReactor("CONSENSUS", consensusReactor)
  201. // Optionally, start the pex reactor
  202. var addrBook *p2p.AddrBook
  203. if config.P2P.PexReactor {
  204. addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
  205. addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
  206. pexReactor := p2p.NewPEXReactor(addrBook)
  207. pexReactor.SetLogger(p2pLogger)
  208. sw.AddReactor("PEX", pexReactor)
  209. }
  210. // Filter peers by addr or pubkey with an ABCI query.
  211. // If the query return code is OK, add peer.
  212. // XXX: Query format subject to change
  213. if config.FilterPeers {
  214. // NOTE: addr is ip:port
  215. sw.SetAddrFilter(func(addr net.Addr) error {
  216. resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/addr/%s", addr.String())})
  217. if err != nil {
  218. return err
  219. }
  220. if resQuery.Code.IsOK() {
  221. return nil
  222. }
  223. return errors.New(resQuery.Code.String())
  224. })
  225. sw.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
  226. resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/pubkey/%X", pubkey.Bytes())})
  227. if err != nil {
  228. return err
  229. }
  230. if resQuery.Code.IsOK() {
  231. return nil
  232. }
  233. return errors.New(resQuery.Code.String())
  234. })
  235. }
  236. // add the event switch to all services
  237. // they should all satisfy events.Eventable
  238. SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  239. // run the profile server
  240. profileHost := config.ProfListenAddress
  241. if profileHost != "" {
  242. go func() {
  243. logger.Error("Profile server", "err", http.ListenAndServe(profileHost, nil))
  244. }()
  245. }
  246. node := &Node{
  247. config: config,
  248. genesisDoc: state.GenesisDoc,
  249. privValidator: privValidator,
  250. privKey: privKey,
  251. sw: sw,
  252. addrBook: addrBook,
  253. evsw: eventSwitch,
  254. blockStore: blockStore,
  255. bcReactor: bcReactor,
  256. mempoolReactor: mempoolReactor,
  257. consensusState: consensusState,
  258. consensusReactor: consensusReactor,
  259. proxyApp: proxyApp,
  260. txIndexer: txIndexer,
  261. }
  262. node.BaseService = *cmn.NewBaseService(logger, "Node", node)
  263. return node, nil
  264. }
  265. // OnStart starts the Node. It implements cmn.Service.
  266. func (n *Node) OnStart() error {
  267. // Create & add listener
  268. protocol, address := cmn.ProtocolAndAddress(n.config.P2P.ListenAddress)
  269. l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, n.Logger.With("module", "p2p"))
  270. n.sw.AddListener(l)
  271. // Start the switch
  272. n.sw.SetNodeInfo(n.makeNodeInfo())
  273. n.sw.SetNodePrivKey(n.privKey)
  274. _, err := n.sw.Start()
  275. if err != nil {
  276. return err
  277. }
  278. // If seeds exist, add them to the address book and dial out
  279. if n.config.P2P.Seeds != "" {
  280. // dial out
  281. seeds := strings.Split(n.config.P2P.Seeds, ",")
  282. if err := n.DialSeeds(seeds); err != nil {
  283. return err
  284. }
  285. }
  286. // Run the RPC server
  287. if n.config.RPC.ListenAddress != "" {
  288. listeners, err := n.startRPC()
  289. if err != nil {
  290. return err
  291. }
  292. n.rpcListeners = listeners
  293. }
  294. return nil
  295. }
  296. // OnStop stops the Node. It implements cmn.Service.
  297. func (n *Node) OnStop() {
  298. n.BaseService.OnStop()
  299. n.Logger.Info("Stopping Node")
  300. // TODO: gracefully disconnect from peers.
  301. n.sw.Stop()
  302. for _, l := range n.rpcListeners {
  303. n.Logger.Info("Closing rpc listener", "listener", l)
  304. if err := l.Close(); err != nil {
  305. n.Logger.Error("Error closing listener", "listener", l, "err", err)
  306. }
  307. }
  308. }
  309. // RunForever waits for an interupt signal and stops the node.
  310. func (n *Node) RunForever() {
  311. // Sleep forever and then...
  312. cmn.TrapSignal(func() {
  313. n.Stop()
  314. })
  315. }
  316. // SetEventSwitch adds the event switch to reactors, mempool, etc.
  317. func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
  318. for _, e := range eventables {
  319. e.SetEventSwitch(evsw)
  320. }
  321. }
  322. // AddListener adds a listener to accept inbound peer connections.
  323. // It should be called before starting the Node.
  324. // The first listener is the primary listener (in NodeInfo)
  325. func (n *Node) AddListener(l p2p.Listener) {
  326. n.sw.AddListener(l)
  327. }
  328. // ConfigureRPC sets all variables in rpccore so they will serve
  329. // rpc calls from this node
  330. func (n *Node) ConfigureRPC() {
  331. rpccore.SetEventSwitch(n.evsw)
  332. rpccore.SetBlockStore(n.blockStore)
  333. rpccore.SetConsensusState(n.consensusState)
  334. rpccore.SetMempool(n.mempoolReactor.Mempool)
  335. rpccore.SetSwitch(n.sw)
  336. rpccore.SetPubKey(n.privValidator.GetPubKey())
  337. rpccore.SetGenesisDoc(n.genesisDoc)
  338. rpccore.SetAddrBook(n.addrBook)
  339. rpccore.SetProxyAppQuery(n.proxyApp.Query())
  340. rpccore.SetTxIndexer(n.txIndexer)
  341. rpccore.SetConsensusReactor(n.consensusReactor)
  342. rpccore.SetLogger(n.Logger.With("module", "rpc"))
  343. }
  344. func (n *Node) startRPC() ([]net.Listener, error) {
  345. n.ConfigureRPC()
  346. listenAddrs := strings.Split(n.config.RPC.ListenAddress, ",")
  347. if n.config.RPC.Unsafe {
  348. rpccore.AddUnsafeRoutes()
  349. }
  350. // we may expose the rpc over both a unix and tcp socket
  351. listeners := make([]net.Listener, len(listenAddrs))
  352. for i, listenAddr := range listenAddrs {
  353. mux := http.NewServeMux()
  354. rpcLogger := n.Logger.With("module", "rpc-server")
  355. wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
  356. wm.SetLogger(rpcLogger.With("protocol", "websocket"))
  357. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  358. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
  359. listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger)
  360. if err != nil {
  361. return nil, err
  362. }
  363. listeners[i] = listener
  364. }
  365. // we expose a simplified api over grpc for convenience to app devs
  366. grpcListenAddr := n.config.RPC.GRPCListenAddress
  367. if grpcListenAddr != "" {
  368. listener, err := grpccore.StartGRPCServer(grpcListenAddr)
  369. if err != nil {
  370. return nil, err
  371. }
  372. listeners = append(listeners, listener)
  373. }
  374. return listeners, nil
  375. }
  376. // Switch returns the Node's Switch.
  377. func (n *Node) Switch() *p2p.Switch {
  378. return n.sw
  379. }
  380. // BlockStore returns the Node's BlockStore.
  381. func (n *Node) BlockStore() *bc.BlockStore {
  382. return n.blockStore
  383. }
  384. // ConsensusState returns the Node's ConsensusState.
  385. func (n *Node) ConsensusState() *consensus.ConsensusState {
  386. return n.consensusState
  387. }
  388. // ConsensusReactor returns the Node's ConsensusReactor.
  389. func (n *Node) ConsensusReactor() *consensus.ConsensusReactor {
  390. return n.consensusReactor
  391. }
  392. // MempoolReactor returns the Node's MempoolReactor.
  393. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  394. return n.mempoolReactor
  395. }
  396. // EventSwitch returns the Node's EventSwitch.
  397. func (n *Node) EventSwitch() types.EventSwitch {
  398. return n.evsw
  399. }
  400. // PrivValidator returns the Node's PrivValidator.
  401. // XXX: for convenience only!
  402. func (n *Node) PrivValidator() types.PrivValidator {
  403. return n.privValidator
  404. }
  405. // GenesisDoc returns the Node's GenesisDoc.
  406. func (n *Node) GenesisDoc() *types.GenesisDoc {
  407. return n.genesisDoc
  408. }
  409. // ProxyApp returns the Node's AppConns, representing its connections to the ABCI application.
  410. func (n *Node) ProxyApp() proxy.AppConns {
  411. return n.proxyApp
  412. }
  413. func (n *Node) makeNodeInfo() *p2p.NodeInfo {
  414. txIndexerStatus := "on"
  415. if _, ok := n.txIndexer.(*null.TxIndex); ok {
  416. txIndexerStatus = "off"
  417. }
  418. nodeInfo := &p2p.NodeInfo{
  419. PubKey: n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
  420. Moniker: n.config.Moniker,
  421. Network: n.consensusState.GetState().ChainID,
  422. Version: version.Version,
  423. Other: []string{
  424. cmn.Fmt("wire_version=%v", wire.Version),
  425. cmn.Fmt("p2p_version=%v", p2p.Version),
  426. cmn.Fmt("consensus_version=%v", consensus.Version),
  427. cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
  428. cmn.Fmt("tx_index=%v", txIndexerStatus),
  429. },
  430. }
  431. // include git hash in the nodeInfo if available
  432. // TODO: use ld-flags
  433. /*if rev, err := cmn.ReadFile(n.config.GetString("revision_file")); err == nil {
  434. nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("revision=%v", string(rev)))
  435. }*/
  436. if !n.sw.IsListening() {
  437. return nodeInfo
  438. }
  439. p2pListener := n.sw.Listeners()[0]
  440. p2pHost := p2pListener.ExternalAddress().IP.String()
  441. p2pPort := p2pListener.ExternalAddress().Port
  442. rpcListenAddr := n.config.RPC.ListenAddress
  443. // We assume that the rpcListener has the same ExternalAddress.
  444. // This is probably true because both P2P and RPC listeners use UPnP,
  445. // except of course if the rpc is only bound to localhost
  446. nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
  447. nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
  448. return nodeInfo
  449. }
  450. //------------------------------------------------------------------------------
  451. // NodeInfo returns the Node's Info from the Switch.
  452. func (n *Node) NodeInfo() *p2p.NodeInfo {
  453. return n.sw.NodeInfo()
  454. }
  455. // DialSeeds dials the given seeds on the Switch.
  456. func (n *Node) DialSeeds(seeds []string) error {
  457. return n.sw.DialSeeds(n.addrBook, seeds)
  458. }
  459. //------------------------------------------------------------------------------