You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

575 lines
17 KiB

10 years ago
10 years ago
11 years ago
11 years ago
8 years ago
8 years ago
11 years ago
8 years ago
11 years ago
8 years ago
11 years ago
8 years ago
8 years ago
11 years ago
11 years ago
8 years ago
10 years ago
8 years ago
9 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. "strings"
  10. abci "github.com/tendermint/abci/types"
  11. crypto "github.com/tendermint/go-crypto"
  12. wire "github.com/tendermint/go-wire"
  13. cmn "github.com/tendermint/tmlibs/common"
  14. dbm "github.com/tendermint/tmlibs/db"
  15. "github.com/tendermint/tmlibs/log"
  16. bc "github.com/tendermint/tendermint/blockchain"
  17. cfg "github.com/tendermint/tendermint/config"
  18. "github.com/tendermint/tendermint/consensus"
  19. mempl "github.com/tendermint/tendermint/mempool"
  20. "github.com/tendermint/tendermint/p2p"
  21. "github.com/tendermint/tendermint/proxy"
  22. rpccore "github.com/tendermint/tendermint/rpc/core"
  23. grpccore "github.com/tendermint/tendermint/rpc/grpc"
  24. rpc "github.com/tendermint/tendermint/rpc/lib"
  25. rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
  26. sm "github.com/tendermint/tendermint/state"
  27. "github.com/tendermint/tendermint/state/txindex"
  28. "github.com/tendermint/tendermint/state/txindex/kv"
  29. "github.com/tendermint/tendermint/state/txindex/null"
  30. "github.com/tendermint/tendermint/types"
  31. "github.com/tendermint/tendermint/version"
  32. _ "net/http/pprof"
  33. )
  34. //------------------------------------------------------------------------------
  35. // DBContext specifies config information for loading a new DB.
  36. type DBContext struct {
  37. ID string
  38. Config *cfg.Config
  39. }
  40. // DBProvider takes a DBContext and returns an instantiated DB.
  41. type DBProvider func(*DBContext) (dbm.DB, error)
  42. // DefaultDBProvider returns a database using the DBBackend and DBDir
  43. // specified in the ctx.Config.
  44. func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
  45. return dbm.NewDB(ctx.ID, ctx.Config.DBBackend, ctx.Config.DBDir()), nil
  46. }
  47. // GenesisDocProvider returns a GenesisDoc.
  48. // It allows the GenesisDoc to be pulled from sources other than the
  49. // filesystem, for instance from a distributed key-value store cluster.
  50. type GenesisDocProvider func() (*types.GenesisDoc, error)
  51. // DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads
  52. // the GenesisDoc from the config.GenesisFile() on the filesystem.
  53. func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider {
  54. return func() (*types.GenesisDoc, error) {
  55. return types.GenesisDocFromFile(config.GenesisFile())
  56. }
  57. }
  58. // NodeProvider takes a config and a logger and returns a ready to go Node.
  59. type NodeProvider func(*cfg.Config, log.Logger) (*Node, error)
  60. // DefaultNewNode returns a Tendermint node with default settings for the
  61. // PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
  62. // It implements NodeProvider.
  63. func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
  64. return NewNode(config,
  65. types.LoadOrGenPrivValidatorFS(config.PrivValidatorFile()),
  66. proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
  67. DefaultGenesisDocProviderFunc(config),
  68. DefaultDBProvider,
  69. logger)
  70. }
  71. //------------------------------------------------------------------------------
  72. // Node is the highest level interface to a full Tendermint node.
  73. // It includes all configuration information and running services.
  74. type Node struct {
  75. cmn.BaseService
  76. // config
  77. config *cfg.Config
  78. genesisDoc *types.GenesisDoc // initial validator set
  79. privValidator types.PrivValidator // local node's validator key
  80. // network
  81. privKey crypto.PrivKeyEd25519 // local node's p2p key
  82. sw *p2p.Switch // p2p connections
  83. addrBook *p2p.AddrBook // known peers
  84. // services
  85. evsw types.EventSwitch // pub/sub for services
  86. blockStore *bc.BlockStore // store the blockchain to disk
  87. bcReactor *bc.BlockchainReactor // for fast-syncing
  88. mempoolReactor *mempl.MempoolReactor // for gossipping transactions
  89. consensusState *consensus.ConsensusState // latest consensus state
  90. consensusReactor *consensus.ConsensusReactor // for participating in the consensus
  91. proxyApp proxy.AppConns // connection to the application
  92. rpcListeners []net.Listener // rpc servers
  93. txIndexer txindex.TxIndexer
  94. }
  95. // NewNode returns a new, ready to go, Tendermint Node.
  96. func NewNode(config *cfg.Config,
  97. privValidator types.PrivValidator,
  98. clientCreator proxy.ClientCreator,
  99. genesisDocProvider GenesisDocProvider,
  100. dbProvider DBProvider,
  101. logger log.Logger) (*Node, error) {
  102. // Get BlockStore
  103. blockStoreDB, err := dbProvider(&DBContext{"blockstore", config})
  104. if err != nil {
  105. return nil, err
  106. }
  107. blockStore := bc.NewBlockStore(blockStoreDB)
  108. consensusLogger := logger.With("module", "consensus")
  109. stateLogger := logger.With("module", "state")
  110. // Get State
  111. stateDB, err := dbProvider(&DBContext{"state", config})
  112. if err != nil {
  113. return nil, err
  114. }
  115. // Get genesis doc
  116. genDoc, err := loadGenesisDoc(stateDB)
  117. if err != nil {
  118. genDoc, err = genesisDocProvider()
  119. if err != nil {
  120. return nil, err
  121. }
  122. // save genesis doc to prevent a certain class of user errors (e.g. when it
  123. // was changed, accidentally or not). Also good for audit trail.
  124. saveGenesisDoc(stateDB, genDoc)
  125. }
  126. state := sm.LoadState(stateDB)
  127. if state == nil {
  128. state, err = sm.MakeGenesisState(stateDB, genDoc)
  129. if err != nil {
  130. return nil, err
  131. }
  132. state.Save()
  133. }
  134. state.SetLogger(stateLogger)
  135. // Create the proxyApp, which manages connections (consensus, mempool, query)
  136. // and sync tendermint and the app by replaying any necessary blocks
  137. handshaker := consensus.NewHandshaker(state, blockStore)
  138. handshaker.SetLogger(consensusLogger)
  139. proxyApp := proxy.NewAppConns(clientCreator, handshaker)
  140. proxyApp.SetLogger(logger.With("module", "proxy"))
  141. if _, err := proxyApp.Start(); err != nil {
  142. return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
  143. }
  144. // reload the state (it may have been updated by the handshake)
  145. state = sm.LoadState(stateDB)
  146. state.SetLogger(stateLogger)
  147. // Transaction indexing
  148. var txIndexer txindex.TxIndexer
  149. switch config.TxIndex {
  150. case "kv":
  151. store, err := dbProvider(&DBContext{"tx_index", config})
  152. if err != nil {
  153. return nil, err
  154. }
  155. txIndexer = kv.NewTxIndex(store)
  156. default:
  157. txIndexer = &null.TxIndex{}
  158. }
  159. state.TxIndexer = txIndexer
  160. // Generate node PrivKey
  161. privKey := crypto.GenPrivKeyEd25519()
  162. // Make event switch
  163. eventSwitch := types.NewEventSwitch()
  164. eventSwitch.SetLogger(logger.With("module", "types"))
  165. if _, err := eventSwitch.Start(); err != nil {
  166. return nil, fmt.Errorf("Failed to start switch: %v", err)
  167. }
  168. // Decide whether to fast-sync or not
  169. // We don't fast-sync when the only validator is us.
  170. fastSync := config.FastSync
  171. if state.Validators.Size() == 1 {
  172. addr, _ := state.Validators.GetByIndex(0)
  173. if bytes.Equal(privValidator.GetAddress(), addr) {
  174. fastSync = false
  175. }
  176. }
  177. // Log whether this node is a validator or an observer
  178. if state.Validators.HasAddress(privValidator.GetAddress()) {
  179. consensusLogger.Info("This node is a validator")
  180. } else {
  181. consensusLogger.Info("This node is not a validator")
  182. }
  183. // Make BlockchainReactor
  184. bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync)
  185. bcReactor.SetLogger(logger.With("module", "blockchain"))
  186. // Make MempoolReactor
  187. mempoolLogger := logger.With("module", "mempool")
  188. mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight)
  189. mempool.SetLogger(mempoolLogger)
  190. mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
  191. mempoolReactor.SetLogger(mempoolLogger)
  192. if config.Consensus.WaitForTxs() {
  193. mempool.EnableTxsAvailable()
  194. }
  195. // Make ConsensusReactor
  196. consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
  197. consensusState.SetLogger(consensusLogger)
  198. if privValidator != nil {
  199. consensusState.SetPrivValidator(privValidator)
  200. }
  201. consensusReactor := consensus.NewConsensusReactor(consensusState, fastSync)
  202. consensusReactor.SetLogger(consensusLogger)
  203. p2pLogger := logger.With("module", "p2p")
  204. sw := p2p.NewSwitch(config.P2P)
  205. sw.SetLogger(p2pLogger)
  206. sw.AddReactor("MEMPOOL", mempoolReactor)
  207. sw.AddReactor("BLOCKCHAIN", bcReactor)
  208. sw.AddReactor("CONSENSUS", consensusReactor)
  209. // Optionally, start the pex reactor
  210. var addrBook *p2p.AddrBook
  211. if config.P2P.PexReactor {
  212. addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
  213. addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
  214. pexReactor := p2p.NewPEXReactor(addrBook)
  215. pexReactor.SetLogger(p2pLogger)
  216. sw.AddReactor("PEX", pexReactor)
  217. }
  218. // Filter peers by addr or pubkey with an ABCI query.
  219. // If the query return code is OK, add peer.
  220. // XXX: Query format subject to change
  221. if config.FilterPeers {
  222. // NOTE: addr is ip:port
  223. sw.SetAddrFilter(func(addr net.Addr) error {
  224. resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/addr/%s", addr.String())})
  225. if err != nil {
  226. return err
  227. }
  228. if resQuery.Code.IsOK() {
  229. return nil
  230. }
  231. return errors.New(resQuery.Code.String())
  232. })
  233. sw.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
  234. resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/pubkey/%X", pubkey.Bytes())})
  235. if err != nil {
  236. return err
  237. }
  238. if resQuery.Code.IsOK() {
  239. return nil
  240. }
  241. return errors.New(resQuery.Code.String())
  242. })
  243. }
  244. // add the event switch to all services
  245. // they should all satisfy events.Eventable
  246. SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  247. // run the profile server
  248. profileHost := config.ProfListenAddress
  249. if profileHost != "" {
  250. go func() {
  251. logger.Error("Profile server", "err", http.ListenAndServe(profileHost, nil))
  252. }()
  253. }
  254. node := &Node{
  255. config: config,
  256. genesisDoc: genDoc,
  257. privValidator: privValidator,
  258. privKey: privKey,
  259. sw: sw,
  260. addrBook: addrBook,
  261. evsw: eventSwitch,
  262. blockStore: blockStore,
  263. bcReactor: bcReactor,
  264. mempoolReactor: mempoolReactor,
  265. consensusState: consensusState,
  266. consensusReactor: consensusReactor,
  267. proxyApp: proxyApp,
  268. txIndexer: txIndexer,
  269. }
  270. node.BaseService = *cmn.NewBaseService(logger, "Node", node)
  271. return node, nil
  272. }
  273. // OnStart starts the Node. It implements cmn.Service.
  274. func (n *Node) OnStart() error {
  275. // Run the RPC server first
  276. // so we can eg. receive txs for the first block
  277. if n.config.RPC.ListenAddress != "" {
  278. listeners, err := n.startRPC()
  279. if err != nil {
  280. return err
  281. }
  282. n.rpcListeners = listeners
  283. }
  284. // Create & add listener
  285. protocol, address := cmn.ProtocolAndAddress(n.config.P2P.ListenAddress)
  286. l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, n.Logger.With("module", "p2p"))
  287. n.sw.AddListener(l)
  288. // Start the switch
  289. n.sw.SetNodeInfo(n.makeNodeInfo())
  290. n.sw.SetNodePrivKey(n.privKey)
  291. _, err := n.sw.Start()
  292. if err != nil {
  293. return err
  294. }
  295. // If seeds exist, add them to the address book and dial out
  296. if n.config.P2P.Seeds != "" {
  297. // dial out
  298. seeds := strings.Split(n.config.P2P.Seeds, ",")
  299. if err := n.DialSeeds(seeds); err != nil {
  300. return err
  301. }
  302. }
  303. return nil
  304. }
  305. // OnStop stops the Node. It implements cmn.Service.
  306. func (n *Node) OnStop() {
  307. n.BaseService.OnStop()
  308. n.Logger.Info("Stopping Node")
  309. // TODO: gracefully disconnect from peers.
  310. n.sw.Stop()
  311. for _, l := range n.rpcListeners {
  312. n.Logger.Info("Closing rpc listener", "listener", l)
  313. if err := l.Close(); err != nil {
  314. n.Logger.Error("Error closing listener", "listener", l, "err", err)
  315. }
  316. }
  317. }
  318. // RunForever waits for an interupt signal and stops the node.
  319. func (n *Node) RunForever() {
  320. // Sleep forever and then...
  321. cmn.TrapSignal(func() {
  322. n.Stop()
  323. })
  324. }
  325. // SetEventSwitch adds the event switch to reactors, mempool, etc.
  326. func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
  327. for _, e := range eventables {
  328. e.SetEventSwitch(evsw)
  329. }
  330. }
  331. // AddListener adds a listener to accept inbound peer connections.
  332. // It should be called before starting the Node.
  333. // The first listener is the primary listener (in NodeInfo)
  334. func (n *Node) AddListener(l p2p.Listener) {
  335. n.sw.AddListener(l)
  336. }
  337. // ConfigureRPC sets all variables in rpccore so they will serve
  338. // rpc calls from this node
  339. func (n *Node) ConfigureRPC() {
  340. rpccore.SetEventSwitch(n.evsw)
  341. rpccore.SetBlockStore(n.blockStore)
  342. rpccore.SetConsensusState(n.consensusState)
  343. rpccore.SetMempool(n.mempoolReactor.Mempool)
  344. rpccore.SetSwitch(n.sw)
  345. rpccore.SetPubKey(n.privValidator.GetPubKey())
  346. rpccore.SetGenesisDoc(n.genesisDoc)
  347. rpccore.SetAddrBook(n.addrBook)
  348. rpccore.SetProxyAppQuery(n.proxyApp.Query())
  349. rpccore.SetTxIndexer(n.txIndexer)
  350. rpccore.SetConsensusReactor(n.consensusReactor)
  351. rpccore.SetLogger(n.Logger.With("module", "rpc"))
  352. }
  353. func (n *Node) startRPC() ([]net.Listener, error) {
  354. n.ConfigureRPC()
  355. listenAddrs := strings.Split(n.config.RPC.ListenAddress, ",")
  356. if n.config.RPC.Unsafe {
  357. rpccore.AddUnsafeRoutes()
  358. }
  359. // we may expose the rpc over both a unix and tcp socket
  360. listeners := make([]net.Listener, len(listenAddrs))
  361. for i, listenAddr := range listenAddrs {
  362. mux := http.NewServeMux()
  363. rpcLogger := n.Logger.With("module", "rpc-server")
  364. wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
  365. wm.SetLogger(rpcLogger.With("protocol", "websocket"))
  366. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  367. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
  368. listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger)
  369. if err != nil {
  370. return nil, err
  371. }
  372. listeners[i] = listener
  373. }
  374. // we expose a simplified api over grpc for convenience to app devs
  375. grpcListenAddr := n.config.RPC.GRPCListenAddress
  376. if grpcListenAddr != "" {
  377. listener, err := grpccore.StartGRPCServer(grpcListenAddr)
  378. if err != nil {
  379. return nil, err
  380. }
  381. listeners = append(listeners, listener)
  382. }
  383. return listeners, nil
  384. }
  385. // Switch returns the Node's Switch.
  386. func (n *Node) Switch() *p2p.Switch {
  387. return n.sw
  388. }
  389. // BlockStore returns the Node's BlockStore.
  390. func (n *Node) BlockStore() *bc.BlockStore {
  391. return n.blockStore
  392. }
  393. // ConsensusState returns the Node's ConsensusState.
  394. func (n *Node) ConsensusState() *consensus.ConsensusState {
  395. return n.consensusState
  396. }
  397. // ConsensusReactor returns the Node's ConsensusReactor.
  398. func (n *Node) ConsensusReactor() *consensus.ConsensusReactor {
  399. return n.consensusReactor
  400. }
  401. // MempoolReactor returns the Node's MempoolReactor.
  402. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  403. return n.mempoolReactor
  404. }
  405. // EventSwitch returns the Node's EventSwitch.
  406. func (n *Node) EventSwitch() types.EventSwitch {
  407. return n.evsw
  408. }
  409. // PrivValidator returns the Node's PrivValidator.
  410. // XXX: for convenience only!
  411. func (n *Node) PrivValidator() types.PrivValidator {
  412. return n.privValidator
  413. }
  414. // GenesisDoc returns the Node's GenesisDoc.
  415. func (n *Node) GenesisDoc() *types.GenesisDoc {
  416. return n.genesisDoc
  417. }
  418. // ProxyApp returns the Node's AppConns, representing its connections to the ABCI application.
  419. func (n *Node) ProxyApp() proxy.AppConns {
  420. return n.proxyApp
  421. }
  422. func (n *Node) makeNodeInfo() *p2p.NodeInfo {
  423. txIndexerStatus := "on"
  424. if _, ok := n.txIndexer.(*null.TxIndex); ok {
  425. txIndexerStatus = "off"
  426. }
  427. nodeInfo := &p2p.NodeInfo{
  428. PubKey: n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
  429. Moniker: n.config.Moniker,
  430. Network: n.genesisDoc.ChainID,
  431. Version: version.Version,
  432. Other: []string{
  433. cmn.Fmt("wire_version=%v", wire.Version),
  434. cmn.Fmt("p2p_version=%v", p2p.Version),
  435. cmn.Fmt("consensus_version=%v", consensus.Version),
  436. cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
  437. cmn.Fmt("tx_index=%v", txIndexerStatus),
  438. },
  439. }
  440. // include git hash in the nodeInfo if available
  441. // TODO: use ld-flags
  442. /*if rev, err := cmn.ReadFile(n.config.GetString("revision_file")); err == nil {
  443. nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("revision=%v", string(rev)))
  444. }*/
  445. if !n.sw.IsListening() {
  446. return nodeInfo
  447. }
  448. p2pListener := n.sw.Listeners()[0]
  449. p2pHost := p2pListener.ExternalAddress().IP.String()
  450. p2pPort := p2pListener.ExternalAddress().Port
  451. rpcListenAddr := n.config.RPC.ListenAddress
  452. // We assume that the rpcListener has the same ExternalAddress.
  453. // This is probably true because both P2P and RPC listeners use UPnP,
  454. // except of course if the rpc is only bound to localhost
  455. nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
  456. nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
  457. return nodeInfo
  458. }
  459. //------------------------------------------------------------------------------
  460. // NodeInfo returns the Node's Info from the Switch.
  461. func (n *Node) NodeInfo() *p2p.NodeInfo {
  462. return n.sw.NodeInfo()
  463. }
  464. // DialSeeds dials the given seeds on the Switch.
  465. func (n *Node) DialSeeds(seeds []string) error {
  466. return n.sw.DialSeeds(n.addrBook, seeds)
  467. }
  468. //------------------------------------------------------------------------------
  469. var (
  470. genesisDocKey = []byte("genesisDoc")
  471. )
  472. // panics if failed to unmarshal bytes
  473. func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) {
  474. bytes := db.Get(genesisDocKey)
  475. if len(bytes) == 0 {
  476. return nil, errors.New("Genesis doc not found")
  477. } else {
  478. var genDoc *types.GenesisDoc
  479. err := json.Unmarshal(bytes, &genDoc)
  480. if err != nil {
  481. cmn.PanicCrisis(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, bytes))
  482. }
  483. return genDoc, nil
  484. }
  485. }
  486. // panics if failed to marshal the given genesis document
  487. func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) {
  488. bytes, err := json.Marshal(genDoc)
  489. if err != nil {
  490. cmn.PanicCrisis(fmt.Sprintf("Failed to save genesis doc due to marshaling error: %v", err))
  491. }
  492. db.SetSync(genesisDocKey, bytes)
  493. }