You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

309 lines
8.3 KiB

10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "net/http"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "time"
  12. acm "github.com/tendermint/tendermint/account"
  13. "github.com/tendermint/tendermint/binary"
  14. bc "github.com/tendermint/tendermint/blockchain"
  15. . "github.com/tendermint/tendermint/common"
  16. "github.com/tendermint/tendermint/consensus"
  17. dbm "github.com/tendermint/tendermint/db"
  18. "github.com/tendermint/tendermint/events"
  19. mempl "github.com/tendermint/tendermint/mempool"
  20. "github.com/tendermint/tendermint/p2p"
  21. "github.com/tendermint/tendermint/rpc/core"
  22. "github.com/tendermint/tendermint/rpc/server"
  23. sm "github.com/tendermint/tendermint/state"
  24. "github.com/tendermint/tendermint/types"
  25. )
  26. import _ "net/http/pprof"
  27. func init() {
  28. go func() {
  29. fmt.Println(http.ListenAndServe("0.0.0.0:6060", nil))
  30. }()
  31. }
  32. type Node struct {
  33. sw *p2p.Switch
  34. evsw *events.EventSwitch
  35. book *p2p.AddrBook
  36. blockStore *bc.BlockStore
  37. pexReactor *p2p.PEXReactor
  38. bcReactor *bc.BlockchainReactor
  39. mempoolReactor *mempl.MempoolReactor
  40. consensusState *consensus.ConsensusState
  41. consensusReactor *consensus.ConsensusReactor
  42. privValidator *sm.PrivValidator
  43. genDoc *sm.GenesisDoc
  44. privKey acm.PrivKeyEd25519
  45. }
  46. func NewNode() *Node {
  47. // Get BlockStore
  48. blockStoreDB := dbm.GetDB("blockstore")
  49. blockStore := bc.NewBlockStore(blockStoreDB)
  50. // Get State
  51. stateDB := dbm.GetDB("state")
  52. state := sm.LoadState(stateDB)
  53. var genDoc *sm.GenesisDoc
  54. if state == nil {
  55. genDoc, state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  56. state.Save()
  57. // write the gendoc to db
  58. buf, n, err := new(bytes.Buffer), new(int64), new(error)
  59. binary.WriteJSON(genDoc, buf, n, err)
  60. stateDB.Set(sm.GenDocKey, buf.Bytes())
  61. if *err != nil {
  62. panic(Fmt("Unable to write gendoc to db: %v", err))
  63. }
  64. } else {
  65. genDocBytes := stateDB.Get(sm.GenDocKey)
  66. err := new(error)
  67. binary.ReadJSONPtr(&genDoc, genDocBytes, err)
  68. if *err != nil {
  69. panic(Fmt("Unable to read gendoc from db: %v", err))
  70. }
  71. }
  72. // add the chainid to the global config
  73. config.Set("chain_id", state.ChainID)
  74. // Get PrivValidator
  75. var privValidator *sm.PrivValidator
  76. privValidatorFile := config.GetString("priv_validator_file")
  77. if _, err := os.Stat(privValidatorFile); err == nil {
  78. privValidator = sm.LoadPrivValidator(privValidatorFile)
  79. log.Notice("Loaded PrivValidator",
  80. "file", privValidatorFile, "privValidator", privValidator)
  81. } else {
  82. privValidator = sm.GenPrivValidator()
  83. privValidator.SetFile(privValidatorFile)
  84. privValidator.Save()
  85. log.Notice("Generated PrivValidator", "file", privValidatorFile)
  86. }
  87. // Generate node PrivKey
  88. privKey := acm.GenPrivKeyEd25519()
  89. // Make event switch
  90. eventSwitch := events.NewEventSwitch()
  91. eventSwitch.Start()
  92. // Make PEXReactor
  93. book := p2p.NewAddrBook(config.GetString("addrbook_file"))
  94. pexReactor := p2p.NewPEXReactor(book)
  95. // Make BlockchainReactor
  96. bcReactor := bc.NewBlockchainReactor(state.Copy(), blockStore, config.GetBool("fast_sync"))
  97. // Make MempoolReactor
  98. mempool := mempl.NewMempool(state.Copy())
  99. mempoolReactor := mempl.NewMempoolReactor(mempool)
  100. // Make ConsensusReactor
  101. consensusState := consensus.NewConsensusState(state.Copy(), blockStore, mempoolReactor)
  102. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, config.GetBool("fast_sync"))
  103. if privValidator != nil {
  104. consensusReactor.SetPrivValidator(privValidator)
  105. }
  106. // Make p2p network switch
  107. sw := p2p.NewSwitch()
  108. sw.AddReactor("PEX", pexReactor)
  109. sw.AddReactor("MEMPOOL", mempoolReactor)
  110. sw.AddReactor("BLOCKCHAIN", bcReactor)
  111. sw.AddReactor("CONSENSUS", consensusReactor)
  112. // add the event switch to all services
  113. // they should all satisfy events.Eventable
  114. SetFireable(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)
  115. return &Node{
  116. sw: sw,
  117. evsw: eventSwitch,
  118. book: book,
  119. blockStore: blockStore,
  120. pexReactor: pexReactor,
  121. bcReactor: bcReactor,
  122. mempoolReactor: mempoolReactor,
  123. consensusState: consensusState,
  124. consensusReactor: consensusReactor,
  125. privValidator: privValidator,
  126. genDoc: genDoc,
  127. privKey: privKey,
  128. }
  129. }
  130. // Call Start() after adding the listeners.
  131. func (n *Node) Start() {
  132. n.book.Start()
  133. n.sw.SetNodeInfo(makeNodeInfo(n.sw, n.privKey))
  134. n.sw.SetNodePrivKey(n.privKey)
  135. n.sw.Start()
  136. }
  137. func (n *Node) Stop() {
  138. log.Notice("Stopping Node")
  139. // TODO: gracefully disconnect from peers.
  140. n.sw.Stop()
  141. n.book.Stop()
  142. }
  143. // Add the event switch to reactors, mempool, etc.
  144. func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) {
  145. for _, e := range eventables {
  146. e.SetFireable(evsw)
  147. }
  148. }
  149. // Add a Listener to accept inbound peer connections.
  150. // Add listeners before starting the Node.
  151. // The first listener is the primary listener (in NodeInfo)
  152. func (n *Node) AddListener(l p2p.Listener) {
  153. log.Notice(Fmt("Added %v", l))
  154. n.sw.AddListener(l)
  155. n.book.AddOurAddress(l.ExternalAddress())
  156. }
  157. // Dial a list of seeds in random order
  158. // Spawns a go routine for each dial
  159. func (n *Node) DialSeed() {
  160. // permute the list, dial them in random order.
  161. seeds := strings.Split(config.GetString("seeds"), ",")
  162. perm := rand.Perm(len(seeds))
  163. for i := 0; i < len(perm); i++ {
  164. go func(i int) {
  165. time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
  166. j := perm[i]
  167. addr := p2p.NewNetAddressString(seeds[j])
  168. n.dialSeed(addr)
  169. }(i)
  170. }
  171. }
  172. func (n *Node) dialSeed(addr *p2p.NetAddress) {
  173. peer, err := n.sw.DialPeerWithAddress(addr)
  174. if err != nil {
  175. log.Error("Error dialing seed", "error", err)
  176. //n.book.MarkAttempt(addr)
  177. return
  178. } else {
  179. log.Notice("Connected to seed", "peer", peer)
  180. n.book.AddAddress(addr, addr)
  181. }
  182. }
  183. func (n *Node) StartRPC() net.Listener {
  184. core.SetBlockStore(n.blockStore)
  185. core.SetConsensusState(n.consensusState)
  186. core.SetConsensusReactor(n.consensusReactor)
  187. core.SetMempoolReactor(n.mempoolReactor)
  188. core.SetSwitch(n.sw)
  189. core.SetPrivValidator(n.privValidator)
  190. core.SetGenDoc(n.genDoc)
  191. listenAddr := config.GetString("rpc_laddr")
  192. mux := http.NewServeMux()
  193. rpcserver.RegisterEventsHandler(mux, n.evsw)
  194. rpcserver.RegisterRPCFuncs(mux, core.Routes)
  195. listener, err := rpcserver.StartHTTPServer(listenAddr, mux)
  196. if err != nil {
  197. panic(err)
  198. }
  199. return listener
  200. }
  201. func (n *Node) Switch() *p2p.Switch {
  202. return n.sw
  203. }
  204. func (n *Node) BlockStore() *bc.BlockStore {
  205. return n.blockStore
  206. }
  207. func (n *Node) ConsensusState() *consensus.ConsensusState {
  208. return n.consensusState
  209. }
  210. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  211. return n.mempoolReactor
  212. }
  213. func (n *Node) EventSwitch() *events.EventSwitch {
  214. return n.evsw
  215. }
  216. func makeNodeInfo(sw *p2p.Switch, privKey acm.PrivKeyEd25519) *types.NodeInfo {
  217. nodeInfo := &types.NodeInfo{
  218. PubKey: privKey.PubKey().(acm.PubKeyEd25519),
  219. Moniker: config.GetString("moniker"),
  220. ChainID: config.GetString("chain_id"),
  221. Version: config.GetString("version"),
  222. }
  223. // include git hash in the nodeInfo if available
  224. if rev, err := ReadFile(config.GetString("revisions_file")); err == nil {
  225. nodeInfo.Revision = string(rev)
  226. }
  227. if !sw.IsListening() {
  228. return nodeInfo
  229. }
  230. p2pListener := sw.Listeners()[0]
  231. p2pHost := p2pListener.ExternalAddress().IP.String()
  232. p2pPort := p2pListener.ExternalAddress().Port
  233. rpcListenAddr := config.GetString("rpc_laddr")
  234. _, rpcPortStr, _ := net.SplitHostPort(rpcListenAddr)
  235. rpcPort, err := strconv.Atoi(rpcPortStr)
  236. if err != nil {
  237. panic(Fmt("Expected numeric RPC.ListenAddr port but got %v", rpcPortStr))
  238. }
  239. // We assume that the rpcListener has the same ExternalAddress.
  240. // This is probably true because both P2P and RPC listeners use UPnP,
  241. // except of course if the rpc is only bound to localhost
  242. nodeInfo.Host = p2pHost
  243. nodeInfo.P2PPort = p2pPort
  244. nodeInfo.RPCPort = uint16(rpcPort)
  245. return nodeInfo
  246. }
  247. //------------------------------------------------------------------------------
  248. func RunNode() {
  249. // Create & start node
  250. n := NewNode()
  251. l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), false)
  252. n.AddListener(l)
  253. n.Start()
  254. log.Notice("Started node", "nodeInfo", n.sw.NodeInfo())
  255. // If seedNode is provided by config, dial out.
  256. if config.GetString("seeds") != "" {
  257. n.DialSeed()
  258. }
  259. // Run the RPC server.
  260. if config.GetString("rpc_laddr") != "" {
  261. n.StartRPC()
  262. }
  263. // Sleep forever and then...
  264. TrapSignal(func() {
  265. n.Stop()
  266. })
  267. }