You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

543 lines
16 KiB

10 years ago
10 years ago
11 years ago
11 years ago
8 years ago
8 years ago
11 years ago
8 years ago
11 years ago
8 years ago
11 years ago
8 years ago
8 years ago
11 years ago
11 years ago
8 years ago
10 years ago
8 years ago
9 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "net/http"
  8. "strings"
  9. abci "github.com/tendermint/abci/types"
  10. crypto "github.com/tendermint/go-crypto"
  11. wire "github.com/tendermint/go-wire"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. dbm "github.com/tendermint/tmlibs/db"
  14. "github.com/tendermint/tmlibs/log"
  15. bc "github.com/tendermint/tendermint/blockchain"
  16. cfg "github.com/tendermint/tendermint/config"
  17. "github.com/tendermint/tendermint/consensus"
  18. mempl "github.com/tendermint/tendermint/mempool"
  19. "github.com/tendermint/tendermint/p2p"
  20. "github.com/tendermint/tendermint/proxy"
  21. rpccore "github.com/tendermint/tendermint/rpc/core"
  22. grpccore "github.com/tendermint/tendermint/rpc/grpc"
  23. rpc "github.com/tendermint/tendermint/rpc/lib"
  24. rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
  25. sm "github.com/tendermint/tendermint/state"
  26. "github.com/tendermint/tendermint/state/txindex"
  27. "github.com/tendermint/tendermint/state/txindex/kv"
  28. "github.com/tendermint/tendermint/state/txindex/null"
  29. "github.com/tendermint/tendermint/types"
  30. "github.com/tendermint/tendermint/version"
  31. _ "net/http/pprof"
  32. )
  33. //------------------------------------------------------------------------------
  34. // DBContext specifies config information for loading a new DB.
  35. type DBContext struct {
  36. ID string
  37. Config *cfg.Config
  38. }
  39. // DBProvider takes a DBContext and returns an instantiated DB.
  40. type DBProvider func(*DBContext) (dbm.DB, error)
  41. // DefaultDBProvider returns a database using the DBBackend and DBDir
  42. // specified in the ctx.Config.
  43. func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
  44. return dbm.NewDB(ctx.ID, ctx.Config.DBBackend, ctx.Config.DBDir()), nil
  45. }
  46. // GenesisDocProvider returns a GenesisDoc.
  47. // It allows the GenesisDoc to be pulled from sources other than the
  48. // filesystem, for instance from a distributed key-value store cluster.
  49. type GenesisDocProvider func() (*types.GenesisDoc, error)
  50. // DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads
  51. // the GenesisDoc from the config.GenesisFile() on the filesystem.
  52. func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider {
  53. return func() (*types.GenesisDoc, error) {
  54. return types.GenesisDocFromFile(config.GenesisFile())
  55. }
  56. }
  57. // NodeProvider takes a config and a logger and returns a ready to go Node.
  58. type NodeProvider func(*cfg.Config, log.Logger) (*Node, error)
  59. // DefaultNewNode returns a Tendermint node with default settings for the
  60. // PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
  61. // It implements NodeProvider.
  62. func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
  63. return NewNode(config,
  64. types.LoadOrGenPrivValidatorFS(config.PrivValidatorFile()),
  65. proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
  66. DefaultGenesisDocProviderFunc(config),
  67. DefaultDBProvider,
  68. logger)
  69. }
  70. //------------------------------------------------------------------------------
  71. // Node is the highest level interface to a full Tendermint node.
  72. // It includes all configuration information and running services.
  73. type Node struct {
  74. cmn.BaseService
  75. // config
  76. config *cfg.Config
  77. genesisDoc *types.GenesisDoc // initial validator set
  78. privValidator types.PrivValidator // local node's validator key
  79. // network
  80. privKey crypto.PrivKeyEd25519 // local node's p2p key
  81. sw *p2p.Switch // p2p connections
  82. addrBook *p2p.AddrBook // known peers
  83. // services
  84. evsw types.EventSwitch // pub/sub for services
  85. blockStore *bc.BlockStore // store the blockchain to disk
  86. bcReactor *bc.BlockchainReactor // for fast-syncing
  87. mempoolReactor *mempl.MempoolReactor // for gossipping transactions
  88. consensusState *consensus.ConsensusState // latest consensus state
  89. consensusReactor *consensus.ConsensusReactor // for participating in the consensus
  90. proxyApp proxy.AppConns // connection to the application
  91. rpcListeners []net.Listener // rpc servers
  92. txIndexer txindex.TxIndexer
  93. }
  94. // NewNode returns a new, ready to go, Tendermint Node.
  95. func NewNode(config *cfg.Config,
  96. privValidator types.PrivValidator,
  97. clientCreator proxy.ClientCreator,
  98. genesisDocProvider GenesisDocProvider,
  99. dbProvider DBProvider,
  100. logger log.Logger) (*Node, error) {
  101. // Get BlockStore
  102. blockStoreDB, err := dbProvider(&DBContext{"blockstore", config})
  103. if err != nil {
  104. return nil, err
  105. }
  106. blockStore := bc.NewBlockStore(blockStoreDB)
  107. consensusLogger := logger.With("module", "consensus")
  108. stateLogger := logger.With("module", "state")
  109. // Get State
  110. stateDB, err := dbProvider(&DBContext{"state", config})
  111. if err != nil {
  112. return nil, err
  113. }
  114. genDoc, err := genesisDocProvider()
  115. if err != nil {
  116. return nil, err
  117. }
  118. state := sm.LoadState(stateDB)
  119. if state == nil {
  120. state, err = sm.MakeGenesisState(stateDB, genDoc)
  121. if err != nil {
  122. return nil, err
  123. }
  124. state.Save()
  125. } else {
  126. state.SetChainID(genDoc.ChainID)
  127. state.SetParams(genDoc.ConsensusParams)
  128. }
  129. state.SetLogger(stateLogger)
  130. // Create the proxyApp, which manages connections (consensus, mempool, query)
  131. // and sync tendermint and the app by replaying any necessary blocks
  132. handshaker := consensus.NewHandshaker(state, blockStore)
  133. handshaker.SetLogger(consensusLogger)
  134. proxyApp := proxy.NewAppConns(clientCreator, handshaker)
  135. proxyApp.SetLogger(logger.With("module", "proxy"))
  136. if _, err := proxyApp.Start(); err != nil {
  137. return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
  138. }
  139. // reload the state (it may have been updated by the handshake)
  140. state = sm.LoadState(stateDB)
  141. state.SetChainID(genDoc.ChainID)
  142. state.SetParams(genDoc.ConsensusParams)
  143. state.SetLogger(stateLogger)
  144. // Transaction indexing
  145. var txIndexer txindex.TxIndexer
  146. switch config.TxIndex {
  147. case "kv":
  148. store, err := dbProvider(&DBContext{"tx_index", config})
  149. if err != nil {
  150. return nil, err
  151. }
  152. txIndexer = kv.NewTxIndex(store)
  153. default:
  154. txIndexer = &null.TxIndex{}
  155. }
  156. state.TxIndexer = txIndexer
  157. // Generate node PrivKey
  158. privKey := crypto.GenPrivKeyEd25519()
  159. // Make event switch
  160. eventSwitch := types.NewEventSwitch()
  161. eventSwitch.SetLogger(logger.With("module", "types"))
  162. if _, err := eventSwitch.Start(); err != nil {
  163. return nil, fmt.Errorf("Failed to start switch: %v", err)
  164. }
  165. // Decide whether to fast-sync or not
  166. // We don't fast-sync when the only validator is us.
  167. fastSync := config.FastSync
  168. if state.Validators.Size() == 1 {
  169. addr, _ := state.Validators.GetByIndex(0)
  170. if bytes.Equal(privValidator.GetAddress(), addr) {
  171. fastSync = false
  172. }
  173. }
  174. // Log whether this node is a validator or an observer
  175. if state.Validators.HasAddress(privValidator.GetAddress()) {
  176. consensusLogger.Info("This node is a validator")
  177. } else {
  178. consensusLogger.Info("This node is not a validator")
  179. }
  180. // Make BlockchainReactor
  181. bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync)
  182. bcReactor.SetLogger(logger.With("module", "blockchain"))
  183. // Make MempoolReactor
  184. mempoolLogger := logger.With("module", "mempool")
  185. mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight)
  186. mempool.SetLogger(mempoolLogger)
  187. mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
  188. mempoolReactor.SetLogger(mempoolLogger)
  189. if config.Consensus.WaitForTxs() {
  190. mempool.EnableTxsAvailable()
  191. }
  192. // Make ConsensusReactor
  193. consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
  194. consensusState.SetLogger(consensusLogger)
  195. if privValidator != nil {
  196. consensusState.SetPrivValidator(privValidator)
  197. }
  198. consensusReactor := consensus.NewConsensusReactor(consensusState, fastSync)
  199. consensusReactor.SetLogger(consensusLogger)
  200. p2pLogger := logger.With("module", "p2p")
  201. sw := p2p.NewSwitch(config.P2P)
  202. sw.SetLogger(p2pLogger)
  203. sw.AddReactor("MEMPOOL", mempoolReactor)
  204. sw.AddReactor("BLOCKCHAIN", bcReactor)
  205. sw.AddReactor("CONSENSUS", consensusReactor)
  206. // Optionally, start the pex reactor
  207. var addrBook *p2p.AddrBook
  208. if config.P2P.PexReactor {
  209. addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
  210. addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
  211. pexReactor := p2p.NewPEXReactor(addrBook)
  212. pexReactor.SetLogger(p2pLogger)
  213. sw.AddReactor("PEX", pexReactor)
  214. }
  215. // Filter peers by addr or pubkey with an ABCI query.
  216. // If the query return code is OK, add peer.
  217. // XXX: Query format subject to change
  218. if config.FilterPeers {
  219. // NOTE: addr is ip:port
  220. sw.SetAddrFilter(func(addr net.Addr) error {
  221. resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/addr/%s", addr.String())})
  222. if err != nil {
  223. return err
  224. }
  225. if resQuery.Code.IsOK() {
  226. return nil
  227. }
  228. return errors.New(resQuery.Code.String())
  229. })
  230. sw.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
  231. resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/pubkey/%X", pubkey.Bytes())})
  232. if err != nil {
  233. return err
  234. }
  235. if resQuery.Code.IsOK() {
  236. return nil
  237. }
  238. return errors.New(resQuery.Code.String())
  239. })
  240. }
  241. // add the event switch to all services
  242. // they should all satisfy events.Eventable
  243. SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  244. // run the profile server
  245. profileHost := config.ProfListenAddress
  246. if profileHost != "" {
  247. go func() {
  248. logger.Error("Profile server", "err", http.ListenAndServe(profileHost, nil))
  249. }()
  250. }
  251. node := &Node{
  252. config: config,
  253. genesisDoc: genDoc,
  254. privValidator: privValidator,
  255. privKey: privKey,
  256. sw: sw,
  257. addrBook: addrBook,
  258. evsw: eventSwitch,
  259. blockStore: blockStore,
  260. bcReactor: bcReactor,
  261. mempoolReactor: mempoolReactor,
  262. consensusState: consensusState,
  263. consensusReactor: consensusReactor,
  264. proxyApp: proxyApp,
  265. txIndexer: txIndexer,
  266. }
  267. node.BaseService = *cmn.NewBaseService(logger, "Node", node)
  268. return node, nil
  269. }
  270. // OnStart starts the Node. It implements cmn.Service.
  271. func (n *Node) OnStart() error {
  272. // Create & add listener
  273. protocol, address := cmn.ProtocolAndAddress(n.config.P2P.ListenAddress)
  274. l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, n.Logger.With("module", "p2p"))
  275. n.sw.AddListener(l)
  276. // Start the switch
  277. n.sw.SetNodeInfo(n.makeNodeInfo())
  278. n.sw.SetNodePrivKey(n.privKey)
  279. _, err := n.sw.Start()
  280. if err != nil {
  281. return err
  282. }
  283. // If seeds exist, add them to the address book and dial out
  284. if n.config.P2P.Seeds != "" {
  285. // dial out
  286. seeds := strings.Split(n.config.P2P.Seeds, ",")
  287. if err := n.DialSeeds(seeds); err != nil {
  288. return err
  289. }
  290. }
  291. // Run the RPC server
  292. if n.config.RPC.ListenAddress != "" {
  293. listeners, err := n.startRPC()
  294. if err != nil {
  295. return err
  296. }
  297. n.rpcListeners = listeners
  298. }
  299. return nil
  300. }
  301. // OnStop stops the Node. It implements cmn.Service.
  302. func (n *Node) OnStop() {
  303. n.BaseService.OnStop()
  304. n.Logger.Info("Stopping Node")
  305. // TODO: gracefully disconnect from peers.
  306. n.sw.Stop()
  307. for _, l := range n.rpcListeners {
  308. n.Logger.Info("Closing rpc listener", "listener", l)
  309. if err := l.Close(); err != nil {
  310. n.Logger.Error("Error closing listener", "listener", l, "err", err)
  311. }
  312. }
  313. }
  314. // RunForever waits for an interupt signal and stops the node.
  315. func (n *Node) RunForever() {
  316. // Sleep forever and then...
  317. cmn.TrapSignal(func() {
  318. n.Stop()
  319. })
  320. }
  321. // SetEventSwitch adds the event switch to reactors, mempool, etc.
  322. func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
  323. for _, e := range eventables {
  324. e.SetEventSwitch(evsw)
  325. }
  326. }
  327. // AddListener adds a listener to accept inbound peer connections.
  328. // It should be called before starting the Node.
  329. // The first listener is the primary listener (in NodeInfo)
  330. func (n *Node) AddListener(l p2p.Listener) {
  331. n.sw.AddListener(l)
  332. }
  333. // ConfigureRPC sets all variables in rpccore so they will serve
  334. // rpc calls from this node
  335. func (n *Node) ConfigureRPC() {
  336. rpccore.SetEventSwitch(n.evsw)
  337. rpccore.SetBlockStore(n.blockStore)
  338. rpccore.SetConsensusState(n.consensusState)
  339. rpccore.SetMempool(n.mempoolReactor.Mempool)
  340. rpccore.SetSwitch(n.sw)
  341. rpccore.SetPubKey(n.privValidator.GetPubKey())
  342. rpccore.SetGenesisDoc(n.genesisDoc)
  343. rpccore.SetAddrBook(n.addrBook)
  344. rpccore.SetProxyAppQuery(n.proxyApp.Query())
  345. rpccore.SetTxIndexer(n.txIndexer)
  346. rpccore.SetConsensusReactor(n.consensusReactor)
  347. rpccore.SetLogger(n.Logger.With("module", "rpc"))
  348. }
  349. func (n *Node) startRPC() ([]net.Listener, error) {
  350. n.ConfigureRPC()
  351. listenAddrs := strings.Split(n.config.RPC.ListenAddress, ",")
  352. if n.config.RPC.Unsafe {
  353. rpccore.AddUnsafeRoutes()
  354. }
  355. // we may expose the rpc over both a unix and tcp socket
  356. listeners := make([]net.Listener, len(listenAddrs))
  357. for i, listenAddr := range listenAddrs {
  358. mux := http.NewServeMux()
  359. rpcLogger := n.Logger.With("module", "rpc-server")
  360. wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
  361. wm.SetLogger(rpcLogger.With("protocol", "websocket"))
  362. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  363. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
  364. listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger)
  365. if err != nil {
  366. return nil, err
  367. }
  368. listeners[i] = listener
  369. }
  370. // we expose a simplified api over grpc for convenience to app devs
  371. grpcListenAddr := n.config.RPC.GRPCListenAddress
  372. if grpcListenAddr != "" {
  373. listener, err := grpccore.StartGRPCServer(grpcListenAddr)
  374. if err != nil {
  375. return nil, err
  376. }
  377. listeners = append(listeners, listener)
  378. }
  379. return listeners, nil
  380. }
  381. // Switch returns the Node's Switch.
  382. func (n *Node) Switch() *p2p.Switch {
  383. return n.sw
  384. }
  385. // BlockStore returns the Node's BlockStore.
  386. func (n *Node) BlockStore() *bc.BlockStore {
  387. return n.blockStore
  388. }
  389. // ConsensusState returns the Node's ConsensusState.
  390. func (n *Node) ConsensusState() *consensus.ConsensusState {
  391. return n.consensusState
  392. }
  393. // ConsensusReactor returns the Node's ConsensusReactor.
  394. func (n *Node) ConsensusReactor() *consensus.ConsensusReactor {
  395. return n.consensusReactor
  396. }
  397. // MempoolReactor returns the Node's MempoolReactor.
  398. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  399. return n.mempoolReactor
  400. }
  401. // EventSwitch returns the Node's EventSwitch.
  402. func (n *Node) EventSwitch() types.EventSwitch {
  403. return n.evsw
  404. }
  405. // PrivValidator returns the Node's PrivValidator.
  406. // XXX: for convenience only!
  407. func (n *Node) PrivValidator() types.PrivValidator {
  408. return n.privValidator
  409. }
  410. // GenesisDoc returns the Node's GenesisDoc.
  411. func (n *Node) GenesisDoc() *types.GenesisDoc {
  412. return n.genesisDoc
  413. }
  414. // ProxyApp returns the Node's AppConns, representing its connections to the ABCI application.
  415. func (n *Node) ProxyApp() proxy.AppConns {
  416. return n.proxyApp
  417. }
  418. func (n *Node) makeNodeInfo() *p2p.NodeInfo {
  419. txIndexerStatus := "on"
  420. if _, ok := n.txIndexer.(*null.TxIndex); ok {
  421. txIndexerStatus = "off"
  422. }
  423. nodeInfo := &p2p.NodeInfo{
  424. PubKey: n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
  425. Moniker: n.config.Moniker,
  426. Network: n.genesisDoc.ChainID,
  427. Version: version.Version,
  428. Other: []string{
  429. cmn.Fmt("wire_version=%v", wire.Version),
  430. cmn.Fmt("p2p_version=%v", p2p.Version),
  431. cmn.Fmt("consensus_version=%v", consensus.Version),
  432. cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
  433. cmn.Fmt("tx_index=%v", txIndexerStatus),
  434. },
  435. }
  436. // include git hash in the nodeInfo if available
  437. // TODO: use ld-flags
  438. /*if rev, err := cmn.ReadFile(n.config.GetString("revision_file")); err == nil {
  439. nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("revision=%v", string(rev)))
  440. }*/
  441. if !n.sw.IsListening() {
  442. return nodeInfo
  443. }
  444. p2pListener := n.sw.Listeners()[0]
  445. p2pHost := p2pListener.ExternalAddress().IP.String()
  446. p2pPort := p2pListener.ExternalAddress().Port
  447. rpcListenAddr := n.config.RPC.ListenAddress
  448. // We assume that the rpcListener has the same ExternalAddress.
  449. // This is probably true because both P2P and RPC listeners use UPnP,
  450. // except of course if the rpc is only bound to localhost
  451. nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
  452. nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
  453. return nodeInfo
  454. }
  455. //------------------------------------------------------------------------------
  456. // NodeInfo returns the Node's Info from the Switch.
  457. func (n *Node) NodeInfo() *p2p.NodeInfo {
  458. return n.sw.NodeInfo()
  459. }
  460. // DialSeeds dials the given seeds on the Switch.
  461. func (n *Node) DialSeeds(seeds []string) error {
  462. return n.sw.DialSeeds(n.addrBook, seeds)
  463. }
  464. //------------------------------------------------------------------------------