diff --git a/cmd/barak/main.go b/cmd/barak/main.go index deafa0b5c..095f53be3 100644 --- a/cmd/barak/main.go +++ b/cmd/barak/main.go @@ -79,9 +79,16 @@ func main() { if barak.rootDir == "" { barak.rootDir = os.Getenv("HOME") + "/.barak" } + err = EnsureDir(barak.rootDir) + if err != nil { + panic(Fmt("Error creating barak rootDir: %v", err)) + } // Write pid to file. - AtomicWriteFile(barak.rootDir+"/pidfile", []byte(Fmt("%v", barak.pid))) + err = AtomicWriteFile(barak.rootDir+"/pidfile", []byte(Fmt("%v", barak.pid))) + if err != nil { + panic(Fmt("Error writing pidfile: %v", err)) + } // Debug. fmt.Printf("Options: %v\n", options) diff --git a/cmd/barak/seed3 b/cmd/barak/seed3 new file mode 100644 index 000000000..dfcfe76c5 --- /dev/null +++ b/cmd/barak/seed3 @@ -0,0 +1,9 @@ +{ + "ListenAddress": "0.0.0.0:8084", + "Validators": [ + { + "VotingPower": 1, + "PubKey": [1,"3A2C5C341FFC1D5F7AB518519FF8289D3BFAB82DFD6E167B926FAD72C1BF10F8"] + } + ] +} diff --git a/cmd/debora/commands.go b/cmd/debora/commands.go index 7e6199d3b..af0819eec 100644 --- a/cmd/debora/commands.go +++ b/cmd/debora/commands.go @@ -2,11 +2,16 @@ package main import ( "fmt" + "io" + "net/url" + "os" + acm "github.com/tendermint/tendermint/account" "github.com/tendermint/tendermint/binary" btypes "github.com/tendermint/tendermint/cmd/barak/types" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/rpc" + "net/http" ) // These are convenience functions for a single developer. @@ -44,6 +49,36 @@ func ListProcesses(privKey acm.PrivKey, remote string, command btypes.CommandLis return response, err } +func DownloadFile(privKey acm.PrivKey, remote string, command btypes.CommandServeFile, outPath string) (n int64, err error) { + // Create authCommandJSONBytes + nonce, err := GetNonce(remote) + if err != nil { + return 0, err + } + commandBytes, signature := SignCommand(privKey, nonce+1, command) + authCommand := btypes.AuthCommand{ + CommandJSONStr: string(commandBytes), + Signatures: []acm.Signature{signature}, + } + authCommandJSONBytes := binary.JSONBytes(authCommand) + // Make request and write to outPath. + httpResponse, err := http.PostForm(remote+"/download", url.Values{"auth_command": {string(authCommandJSONBytes)}}) + if err != nil { + return 0, err + } + defer httpResponse.Body.Close() + outFile, err := os.OpenFile(outPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600) + if err != nil { + return 0, err + } + defer outFile.Close() + n, err = io.Copy(outFile, httpResponse.Body) + if err != nil { + return 0, err + } + return n, nil +} + //----------------------------------------------------------------------------- // Utility method to get nonce from the remote. diff --git a/cmd/debora/main.go b/cmd/debora/main.go index 714f80917..77469eed7 100644 --- a/cmd/debora/main.go +++ b/cmd/debora/main.go @@ -79,6 +79,11 @@ func main() { Usage: "list processes", Action: cliListProcesses, }, + cli.Command{ + Name: "download", + Usage: "download file ", + Action: cliDownloadFile, + }, } app.Run(os.Args) } @@ -189,3 +194,24 @@ func cliListProcesses(c *cli.Context) { } } } + +func cliDownloadFile(c *cli.Context) { + args := c.Args() + if len(args) != 2 { + Exit("Must specify ") + } + remotePath := args[0] + localPathPrefix := args[1] + command := btypes.CommandServeFile{ + Path: remotePath, + } + for i, remote := range Config.Remotes { + localPath := Fmt("%v_%v", localPathPrefix, i) + n, err := DownloadFile(Config.PrivKey, remote, command, localPath) + if err != nil { + fmt.Printf("%v failure. %v\n", remote, err) + } else { + fmt.Printf("%v success. Wrote %v bytes to %v\n", remote, n, localPath) + } + } +} diff --git a/cmd/tendermint/main.go b/cmd/tendermint/main.go index 761a89757..946e21c4d 100644 --- a/cmd/tendermint/main.go +++ b/cmd/tendermint/main.go @@ -40,6 +40,8 @@ Commands: gen_tx() case "probe_upnp": probe_upnp() + case "unsafe_reset_priv_validator": + reset_priv_validator() default: fmt.Printf("Unknown command %v\n", args[0]) } diff --git a/cmd/tendermint/reset_priv_validator.go b/cmd/tendermint/reset_priv_validator.go new file mode 100644 index 000000000..82301b0b8 --- /dev/null +++ b/cmd/tendermint/reset_priv_validator.go @@ -0,0 +1,29 @@ +package main + +import ( + "os" + + "github.com/tendermint/tendermint/config" + sm "github.com/tendermint/tendermint/state" +) + +// NOTE: this is totally unsafe. +// it's only suitable for testnets. +func reset_priv_validator() { + // Get PrivValidator + var privValidator *sm.PrivValidator + privValidatorFile := config.App().GetString("PrivValidatorFile") + if _, err := os.Stat(privValidatorFile); err == nil { + privValidator = sm.LoadPrivValidator(privValidatorFile) + privValidator.LastHeight = 0 + privValidator.LastRound = 0 + privValidator.LastStep = 0 + privValidator.Save() + log.Info("Reset PrivValidator", "file", privValidatorFile) + } else { + privValidator = sm.GenPrivValidator() + privValidator.SetFile(privValidatorFile) + privValidator.Save() + log.Info("Generated PrivValidator", "file", privValidatorFile) + } +} diff --git a/config/config.go b/config/config.go index 2be5438f1..e31dba575 100644 --- a/config/config.go +++ b/config/config.go @@ -36,7 +36,7 @@ var defaultConfig = `# This is a TOML config file. # For more information, see https://github.com/toml-lang/toml Moniker = "anonymous" -Network = "tendermint_testnet0" +Network = "tendermint_testnet2" ListenAddr = "0.0.0.0:8080" # First node to connect to. Command-line overridable. SeedNode = "188.166.55.222:8080" diff --git a/consensus/reactor.go b/consensus/reactor.go index 7c3033d6e..ffc1c9219 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -136,7 +136,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes) return } - log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "bytes", msgBytes) + log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_) //, "bytes", msgBytes) switch chId { case StateChannel: @@ -295,7 +295,7 @@ func (conR *ConsensusReactor) sendNewRoundStepRoutine(peer *p2p.Peer) { peer.Send(StateChannel, nrsMsg) } if csMsg != nil { - peer.Send(StateChannel, nrsMsg) + peer.Send(StateChannel, csMsg) } } @@ -545,7 +545,7 @@ OUTER_LOOP: type PeerRoundState struct { Height uint // Height peer is at Round uint // Round peer is at - Step RoundStep // Step peer is at + Step RoundStepType // Step peer is at StartTime time.Time // Estimated start of round 0 at this height Proposal bool // True if peer has proposal for this round ProposalBlockParts types.PartSetHeader // @@ -790,7 +790,7 @@ func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) { type NewRoundStepMessage struct { Height uint Round uint - Step RoundStep + Step RoundStepType SecondsSinceStartTime uint } diff --git a/consensus/state.go b/consensus/state.go index 6352769bb..e2fde2ebc 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -88,20 +88,20 @@ var ( ) //----------------------------------------------------------------------------- -// RoundStep enum type +// RoundStepType enum type -type RoundStep uint8 +type RoundStepType uint8 // These must be numeric, ordered. const ( - RoundStepNewHeight = RoundStep(0x00) // Round0 for new height started, wait til CommitTime + Delta - RoundStepNewRound = RoundStep(0x01) // Pseudostep, immediately goes to RoundStepPropose - RoundStepPropose = RoundStep(0x10) // Did propose, gossip proposal - RoundStepPrevote = RoundStep(0x11) // Did prevote, gossip prevotes - RoundStepPrecommit = RoundStep(0x12) // Did precommit, gossip precommits - RoundStepCommit = RoundStep(0x20) // Entered commit state machine + RoundStepNewHeight = RoundStepType(0x01) // Round0 for new height started, wait til CommitTime + Delta + RoundStepNewRound = RoundStepType(0x02) // Pseudostep, immediately goes to RoundStepPropose + RoundStepPropose = RoundStepType(0x03) // Did propose, gossip proposal + RoundStepPrevote = RoundStepType(0x04) // Did prevote, gossip prevotes + RoundStepPrecommit = RoundStepType(0x05) // Did precommit, gossip precommits + RoundStepCommit = RoundStepType(0x06) // Entered commit state machine ) -func (rs RoundStep) String() string { +func (rs RoundStepType) String() string { switch rs { case RoundStepNewHeight: return "RoundStepNewHeight" @@ -123,15 +123,15 @@ func (rs RoundStep) String() string { //----------------------------------------------------------------------------- // RoundAction enum type -type RoundActionType uint8 +type RoundActionType string const ( - RoundActionPropose = RoundActionType(0xA0) // Propose and goto RoundStepPropose - RoundActionPrevote = RoundActionType(0xA1) // Prevote and goto RoundStepPrevote - RoundActionPrecommit = RoundActionType(0xA2) // Precommit and goto RoundStepPrecommit - RoundActionTryCommit = RoundActionType(0xC0) // Goto RoundStepCommit, or RoundStepPropose for next round. - RoundActionCommit = RoundActionType(0xC1) // Goto RoundStepCommit upon +2/3 commits - RoundActionTryFinalize = RoundActionType(0xC2) // Maybe goto RoundStepPropose for next round. + RoundActionPropose = RoundActionType("PR") // Propose and goto RoundStepPropose + RoundActionPrevote = RoundActionType("PV") // Prevote and goto RoundStepPrevote + RoundActionPrecommit = RoundActionType("PC") // Precommit and goto RoundStepPrecommit + RoundActionTryCommit = RoundActionType("TC") // Goto RoundStepCommit, or RoundStepPropose for next round. + RoundActionCommit = RoundActionType("CM") // Goto RoundStepCommit upon +2/3 commits + RoundActionTryFinalize = RoundActionType("TF") // Maybe goto RoundStepPropose for next round. ) func (rat RoundActionType) String() string { @@ -171,7 +171,7 @@ func (ra RoundAction) String() string { type RoundState struct { Height uint // Height we are working on Round uint - Step RoundStep + Step RoundStepType StartTime time.Time CommitTime time.Time // Time when +2/3 commits were found Validators *sm.ValidatorSet @@ -187,7 +187,6 @@ type RoundState struct { Precommits *VoteSet Commits *VoteSet LastCommits *VoteSet - PrivValidator *sm.PrivValidator } func (rs *RoundState) String() string { @@ -241,6 +240,7 @@ type ConsensusState struct { blockStore *bc.BlockStore mempoolReactor *mempl.MempoolReactor + privValidator *sm.PrivValidator runActionCh chan RoundAction newStepCh chan *RoundState @@ -318,6 +318,7 @@ func (cs *ConsensusState) stepTransitionRoutine() { // For clarity, all state transitions that happen after some timeout are here. // Schedule the next action by pushing a RoundAction{} to cs.runActionCh. scheduleNextAction := func() { + // NOTE: rs must be fetched in the same callstack as the caller. rs := cs.getRoundState() go func() { // NOTE: We can push directly to runActionCh because @@ -491,7 +492,12 @@ func (cs *ConsensusState) updateToState(state *sm.State, contiguous bool) { cs.Round = 0 cs.Step = RoundStepNewHeight if cs.CommitTime.IsZero() { - cs.StartTime = state.LastBlockTime.Add(newHeightDelta) + // "Now" makes it easier to sync up dev nodes. + // We add newHeightDelta to allow transactions + // to be gathered for the first block. + // And alternative solution that relies on clocks: + // cs.StartTime = state.LastBlockTime.Add(newHeightDelta) + cs.StartTime = time.Now().Add(newHeightDelta) } else { cs.StartTime = cs.CommitTime.Add(newHeightDelta) } @@ -521,12 +527,12 @@ func (cs *ConsensusState) updateToState(state *sm.State, contiguous bool) { } // If we've timed out, then send rebond tx. - if cs.PrivValidator != nil && cs.state.UnbondingValidators.HasAddress(cs.PrivValidator.Address) { + if cs.privValidator != nil && cs.state.UnbondingValidators.HasAddress(cs.privValidator.Address) { rebondTx := &types.RebondTx{ - Address: cs.PrivValidator.Address, + Address: cs.privValidator.Address, Height: cs.Height + 1, } - err := cs.PrivValidator.SignRebondTx(rebondTx) + err := cs.privValidator.SignRebondTx(rebondTx) if err == nil { log.Info("Signed and broadcast RebondTx", "height", cs.Height, "round", cs.Round, "tx", rebondTx) cs.mempoolReactor.BroadcastTx(rebondTx) @@ -565,7 +571,7 @@ func (cs *ConsensusState) setupNewRound(round uint) { func (cs *ConsensusState) SetPrivValidator(priv *sm.PrivValidator) { cs.mtx.Lock() defer cs.mtx.Unlock() - cs.PrivValidator = priv + cs.privValidator = priv } //----------------------------------------------------------------------------- @@ -598,14 +604,14 @@ func (cs *ConsensusState) RunActionPropose(height uint, round uint) { }() // Nothing to do if it's not our turn. - if cs.PrivValidator == nil { + if cs.privValidator == nil { return } - if !bytes.Equal(cs.Validators.Proposer().Address, cs.PrivValidator.Address) { - log.Debug("Not our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.PrivValidator) + if !bytes.Equal(cs.Validators.Proposer().Address, cs.privValidator.Address) { + log.Debug("Not our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) return } else { - log.Debug("Our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.PrivValidator) + log.Debug("Our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) } var block *types.Block @@ -673,7 +679,7 @@ func (cs *ConsensusState) RunActionPropose(height uint, round uint) { // Make proposal proposal := NewProposal(cs.Height, cs.Round, blockParts.Header(), polParts.Header()) - err := cs.PrivValidator.SignProposal(proposal) + err := cs.privValidator.SignProposal(proposal) if err == nil { log.Info("Signed and set proposal", "height", cs.Height, "round", cs.Round, "proposal", proposal) log.Debug(Fmt("Signed and set proposal block: %v", block)) @@ -1059,7 +1065,7 @@ func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartS } func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote { - if cs.PrivValidator == nil || !cs.Validators.HasAddress(cs.PrivValidator.Address) { + if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.Address) { return nil } vote := &types.Vote{ @@ -1069,10 +1075,10 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part BlockHash: hash, BlockParts: header, } - err := cs.PrivValidator.SignVote(vote) + err := cs.privValidator.SignVote(vote) if err == nil { log.Info("Signed and added vote", "height", cs.Height, "round", cs.Round, "vote", vote) - cs.addVote(cs.PrivValidator.Address, vote) + cs.addVote(cs.privValidator.Address, vote) return vote } else { log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) diff --git a/consensus/state_test.go b/consensus/state_test.go index 60e54a692..a5545d308 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -82,7 +82,7 @@ func TestRunActionPropose(t *testing.T) { } func checkRoundState(t *testing.T, rs *RoundState, - height uint, round uint, step RoundStep) { + height uint, round uint, step RoundStepType) { if rs.Height != height { t.Errorf("rs.Height should be %v, got %v", height, rs.Height) } diff --git a/consensus/types/proposal.go b/consensus/types/proposal.go index a78570a53..79a3f001b 100644 --- a/consensus/types/proposal.go +++ b/consensus/types/proposal.go @@ -7,6 +7,7 @@ import ( "github.com/tendermint/tendermint/account" "github.com/tendermint/tendermint/binary" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/types" ) @@ -38,6 +39,7 @@ func (p *Proposal) String() string { } func (p *Proposal) WriteSignBytes(w io.Writer, n *int64, err *error) { + binary.WriteString(config.App().GetString("Network"), w, n, err) binary.WriteUvarint(p.Height, w, n, err) binary.WriteUvarint(p.Round, w, n, err) binary.WriteBinary(p.BlockParts, w, n, err) diff --git a/node/node.go b/node/node.go index df5bd5a26..cab2c397a 100644 --- a/node/node.go +++ b/node/node.go @@ -1,8 +1,10 @@ package node import ( + "net" "net/http" "os" + "strconv" bc "github.com/tendermint/tendermint/blockchain" . "github.com/tendermint/tendermint/common" @@ -15,6 +17,7 @@ import ( "github.com/tendermint/tendermint/rpc" "github.com/tendermint/tendermint/rpc/core" sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" ) type Node struct { @@ -79,7 +82,6 @@ func NewNode() *Node { } sw := p2p.NewSwitch() - sw.SetNetwork(config.App().GetString("Network")) sw.AddReactor("PEX", pexReactor) sw.AddReactor("MEMPOOL", mempoolReactor) sw.AddReactor("BLOCKCHAIN", bcReactor) @@ -103,9 +105,12 @@ func NewNode() *Node { } } +// Call Start() after adding the listeners. func (n *Node) Start() { log.Info("Starting Node") n.book.Start() + nodeInfo := makeNodeInfo(n.sw) + n.sw.SetNodeInfo(nodeInfo) n.sw.Start() if config.App().GetBool("FastSync") { // TODO: When FastSync is done, start CONSENSUS. @@ -128,6 +133,8 @@ func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) { } // Add a Listener to accept inbound peer connections. +// Add listeners before starting the Node. +// The first listener is the primary listener (in NodeInfo) func (n *Node) AddListener(l p2p.Listener) { log.Info(Fmt("Added %v", l)) n.sw.AddListener(l) @@ -176,6 +183,32 @@ func (n *Node) EventSwitch() *events.EventSwitch { return n.evsw } +func makeNodeInfo(sw *p2p.Switch) *types.NodeInfo { + nodeInfo := &types.NodeInfo{ + Moniker: config.App().GetString("Moniker"), + Network: config.App().GetString("Network"), + } + if !sw.IsListening() { + return nodeInfo + } + p2pListener := sw.Listeners()[0] + p2pHost := p2pListener.ExternalAddress().IP.String() + p2pPort := p2pListener.ExternalAddress().Port + rpcListenAddr := config.App().GetString("RPC.HTTP.ListenAddr") + _, rpcPortStr, _ := net.SplitHostPort(rpcListenAddr) + rpcPort, err := strconv.Atoi(rpcPortStr) + if err != nil { + panic(Fmt("Expected numeric RPC.HTTP.ListenAddr port but got %v", rpcPortStr)) + } + + // We assume that the rpcListener has the same ExternalAddress. + // This is probably true because both P2P and RPC listeners use UPnP. + nodeInfo.Host = p2pHost + nodeInfo.P2PPort = p2pPort + nodeInfo.RPCPort = uint16(rpcPort) + return nodeInfo +} + //------------------------------------------------------------------------------ func RunNode() { diff --git a/p2p/connection.go b/p2p/connection.go index 85d6e88ea..f6f52bce9 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -11,7 +11,7 @@ import ( "time" flow "code.google.com/p/mxk/go1/flowcontrol" - "github.com/tendermint/log15" + //"github.com/tendermint/log15" "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" ) @@ -184,7 +184,7 @@ func (c *MConnection) Send(chId byte, msg interface{}) bool { return false } - log.Debug("Send", "channel", chId, "connection", c, "msg", msg, "bytes", binary.BinaryBytes(msg)) + log.Debug("Send", "channel", chId, "connection", c, "msg", msg) //, "bytes", binary.BinaryBytes(msg)) // Send message to channel. channel, ok := c.channelsIdx[chId] @@ -365,18 +365,20 @@ FOR_LOOP: // Block until .recvMonitor says we can read. c.recvMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.recvRate), true) - // Peek into bufReader for debugging - if numBytes := c.bufReader.Buffered(); numBytes > 0 { - log.Debug("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte { - bytes, err := c.bufReader.Peek(MinInt(numBytes, 100)) - if err == nil { - return bytes - } else { - log.Warn("Error peeking connection buffer", "error", err) - return nil - } - }}) - } + /* + // Peek into bufReader for debugging + if numBytes := c.bufReader.Buffered(); numBytes > 0 { + log.Debug("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte { + bytes, err := c.bufReader.Peek(MinInt(numBytes, 100)) + if err == nil { + return bytes + } else { + log.Warn("Error peeking connection buffer", "error", err) + return nil + } + }}) + } + */ // Read packet type var n int64 @@ -417,7 +419,7 @@ FOR_LOOP: } msgBytes := channel.recvMsgPacket(pkt) if msgBytes != nil { - log.Debug("Received bytes", "chId", pkt.ChannelId, "msgBytes", msgBytes) + //log.Debug("Received bytes", "chId", pkt.ChannelId, "msgBytes", msgBytes) c.onReceive(pkt.ChannelId, msgBytes) } default: diff --git a/p2p/listener.go b/p2p/listener.go index cc27c0c77..48557b970 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -128,6 +128,10 @@ func (l *DefaultListener) ExternalAddress() *NetAddress { return l.extAddr } +func (l *DefaultListener) NetListener() net.Listener { + return l.listener +} + func (l *DefaultListener) Stop() { if atomic.CompareAndSwapUint32(&l.stopped, 0, 1) { l.listener.Close() diff --git a/p2p/peer.go b/p2p/peer.go index 5340366fd..1a63b52e8 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -4,10 +4,12 @@ import ( "fmt" "io" "net" + "sync" "sync/atomic" "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/types" ) type nodeInfo struct { @@ -21,13 +23,39 @@ type Peer struct { mconn *MConnection running uint32 - Nodeinfo *nodeInfo - + *types.NodeInfo Key string Data *CMap // User data. } -func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { +func peerHandshake(conn net.Conn, ourNodeInfo *types.NodeInfo) (*types.NodeInfo, error) { + var peerNodeInfo = new(types.NodeInfo) + var wg sync.WaitGroup + var err1 error + var err2 error + wg.Add(2) + go func() { + var n int64 + binary.WriteBinary(ourNodeInfo, conn, &n, &err1) + wg.Done() + }() + go func() { + var n int64 + binary.ReadBinary(peerNodeInfo, conn, &n, &err2) + log.Info("Peer handshake", "peerNodeInfo", peerNodeInfo) + wg.Done() + }() + wg.Wait() + if err1 != nil { + return nil, err1 + } + if err2 != nil { + return nil, err2 + } + return peerNodeInfo, nil +} + +func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { var p *Peer onReceive := func(chId byte, msgBytes []byte) { reactor := reactorsByCh[chId] @@ -45,6 +73,7 @@ func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDesc outbound: outbound, mconn: mconn, running: 0, + NodeInfo: peerNodeInfo, Key: mconn.RemoteAddress.String(), Data: NewCMap(), } diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 8a3ba6444..0f9766d62 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -101,12 +101,6 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { log.Info("Received message", "msg", msg) switch msg.(type) { - case *pexHandshakeMessage: - network := msg.(*pexHandshakeMessage).Network - if network != pexR.sw.network { - err := fmt.Sprintf("Peer is on a different chain/network. Got %s, expected %s", network, pexR.sw.network) - pexR.sw.StopPeerForError(src, err) - } case *pexRequestMessage: // src requested some peers. // TODO: prevent abuse. @@ -219,16 +213,14 @@ func (pexR *PEXReactor) SetFireable(evsw events.Fireable) { // Messages const ( - msgTypeRequest = byte(0x01) - msgTypeAddrs = byte(0x02) - msgTypeHandshake = byte(0x03) + msgTypeRequest = byte(0x01) + msgTypeAddrs = byte(0x02) ) type PexMessage interface{} var _ = binary.RegisterInterface( struct{ PexMessage }{}, - binary.ConcreteType{&pexHandshakeMessage{}, msgTypeHandshake}, binary.ConcreteType{&pexRequestMessage{}, msgTypeRequest}, binary.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs}, ) @@ -241,17 +233,6 @@ func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) { return } -/* -A pexHandshakeMessage contains the network identifier. -*/ -type pexHandshakeMessage struct { - Network string -} - -func (m *pexHandshakeMessage) String() string { - return "[pexHandshake]" -} - /* A pexRequestMessage requests additional peer addresses. */ diff --git a/p2p/switch.go b/p2p/switch.go index a07c026fc..77dd5655c 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -8,6 +8,7 @@ import ( "time" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/types" ) type Reactor interface { @@ -39,7 +40,6 @@ or more `Channels`. So while sending outgoing messages is typically performed o incoming messages are received on the reactor. */ type Switch struct { - network string listeners []Listener reactors map[string]Reactor chDescs []*ChannelDescriptor @@ -47,6 +47,7 @@ type Switch struct { peers *PeerSet dialing *CMap running uint32 + nodeInfo *types.NodeInfo // our node info } var ( @@ -58,25 +59,18 @@ const ( ) func NewSwitch() *Switch { - sw := &Switch{ - network: "", reactors: make(map[string]Reactor), chDescs: make([]*ChannelDescriptor, 0), reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: NewCMap(), running: 0, + nodeInfo: nil, } - return sw } -// Not goroutine safe. -func (sw *Switch) SetNetwork(network string) { - sw.network = network -} - // Not goroutine safe. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { // Validate the reactor. @@ -109,6 +103,7 @@ func (sw *Switch) AddListener(l Listener) { sw.listeners = append(sw.listeners, l) } +// Not goroutine safe. func (sw *Switch) Listeners() []Listener { return sw.listeners } @@ -118,6 +113,11 @@ func (sw *Switch) IsListening() bool { return len(sw.listeners) > 0 } +// Not goroutine safe. +func (sw *Switch) SetNodeInfo(nodeInfo *types.NodeInfo) { + sw.nodeInfo = nodeInfo +} + func (sw *Switch) Start() { if atomic.CompareAndSwapUint32(&sw.running, 0, 1) { // Start reactors @@ -154,8 +154,17 @@ func (sw *Switch) Stop() { } } +// NOTE: This performs a blocking handshake before the peer is added. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { - peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) + // First, perform handshake + peerNodeInfo, err := peerHandshake(conn, sw.nodeInfo) + if err != nil { + return nil, err + } + if peerNodeInfo.Network != sw.nodeInfo.Network { + return nil, fmt.Errorf("Peer is on different network %v", peerNodeInfo.Network) + } + peer := newPeer(conn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) // Add the peer to .peers if sw.peers.Add(peer) { @@ -177,10 +186,6 @@ func (sw *Switch) startInitPeer(peer *Peer) { // Notify reactors sw.doAddPeer(peer) - - // Send handshake - msg := &pexHandshakeMessage{Network: sw.network} - peer.Send(PexChannel, msg) } func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { @@ -282,8 +287,7 @@ func (sw *Switch) listenerRoutine(l Listener) { // New inbound connection! peer, err := sw.AddPeerWithConnection(inConn, false) if err != nil { - log.Info("Ignoring error from inbound connection: %v\n%v", - peer, err) + log.Info(Fmt("Ignoring error from inbound connection: %v\n%v", peer, err)) continue } // NOTE: We don't yet have the external address of the diff --git a/p2p/switch_test.go b/p2p/switch_test.go index d66c039a3..fc3ed41db 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/types" ) type PeerMessage struct { @@ -73,7 +74,17 @@ func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *S // Create two switches that will be interconnected. s1 := initSwitch(NewSwitch()) + s1.SetNodeInfo(&types.NodeInfo{ + Moniker: "switch1", + Network: "testing", + }) s2 := initSwitch(NewSwitch()) + s2.SetNodeInfo(&types.NodeInfo{ + Moniker: "switch2", + Network: "testing", + }) + + // Start switches s1.Start() s2.Start() @@ -93,7 +104,7 @@ func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *S t.Fatalf("Could not get inbound connection from listener") } - s1.AddPeerWithConnection(connIn, false) + go s1.AddPeerWithConnection(connIn, false) // AddPeer is blocking, requires handshake. s2.AddPeerWithConnection(connOut, true) // Wait for things to happen, peers to get added... @@ -142,10 +153,10 @@ func TestSwitches(t *testing.T) { // Check message on ch0 ch0Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x00)] - if len(ch0Msgs) != 2 { + if len(ch0Msgs) != 1 { t.Errorf("Expected to have received 1 message in ch0") } - if !bytes.Equal(ch0Msgs[1].Bytes, binary.BinaryBytes(ch0Msg)) { + if !bytes.Equal(ch0Msgs[0].Bytes, binary.BinaryBytes(ch0Msg)) { t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", binary.BinaryBytes(ch0Msg), ch0Msgs[0].Bytes) } diff --git a/process/process.go b/process/process.go index 3f18378e4..d0b12f10b 100644 --- a/process/process.go +++ b/process/process.go @@ -31,7 +31,7 @@ const ( // execPath: command name // args: args to command. (should not include name) func Create(mode int, label string, execPath string, args []string, input string, outPath string) (*Process, error) { - outFile, err := os.OpenFile(outPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + outFile, err := os.OpenFile(outPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600) if err != nil { return nil, err } diff --git a/rpc/core/validators.go b/rpc/core/consensus.go similarity index 75% rename from rpc/core/validators.go rename to rpc/core/consensus.go index fd8f2d736..c74cf8cc4 100644 --- a/rpc/core/validators.go +++ b/rpc/core/consensus.go @@ -1,12 +1,11 @@ package core import ( + "github.com/tendermint/tendermint/binary" ctypes "github.com/tendermint/tendermint/rpc/core/types" sm "github.com/tendermint/tendermint/state" ) -//----------------------------------------------------------------------------- - func ListValidators() (*ctypes.ResponseListValidators, error) { var blockHeight uint var bondedValidators []*sm.Validator @@ -25,3 +24,8 @@ func ListValidators() (*ctypes.ResponseListValidators, error) { return &ctypes.ResponseListValidators{blockHeight, bondedValidators, unbondingValidators}, nil } + +func DumpConsensusState() (*ctypes.ResponseDumpConsensusState, error) { + jsonBytes := binary.JSONBytes(consensusState.GetRoundState()) + return &ctypes.ResponseDumpConsensusState{string(jsonBytes)}, nil +} diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 6f071e035..6d5d98926 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -2,7 +2,6 @@ package core import ( "fmt" - . "github.com/tendermint/tendermint/common" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" diff --git a/rpc/core/net.go b/rpc/core/net.go index 62973a680..7f852f34e 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -42,11 +42,8 @@ func NetInfo() (*ctypes.ResponseNetInfo, error) { peers := []ctypes.Peer{} for _, peer := range p2pSwitch.Peers().List() { peers = append(peers, ctypes.Peer{ - //Address: peer.Connection().RemoteAddress.String(), - Host: peer.Nodeinfo.Host, + NodeInfo: *peer.NodeInfo, IsOutbound: peer.IsOutbound(), - P2PPort: peer.Nodeinfo.P2PPort, - RPCPort: peer.Nodeinfo.RPCPort, }) } return &ctypes.ResponseNetInfo{ diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 395ad82a4..17f196ab0 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -14,6 +14,7 @@ var Routes = map[string]*rpc.RPCFunc{ "call": rpc.NewRPCFunc(Call, []string{"address", "data"}), "call_code": rpc.NewRPCFunc(CallCode, []string{"code", "data"}), "list_validators": rpc.NewRPCFunc(ListValidators, []string{}), + "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, []string{}), "dump_storage": rpc.NewRPCFunc(DumpStorage, []string{"address"}), "broadcast_tx": rpc.NewRPCFunc(BroadcastTx, []string{"tx"}), "list_accounts": rpc.NewRPCFunc(ListAccounts, []string{}), diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index ee662105d..b3b13c030 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -78,9 +78,7 @@ type ResponseNetInfo struct { } type Peer struct { - Host string // ip - P2PPort uint16 - RPCPort uint16 + types.NodeInfo IsOutbound bool } @@ -93,3 +91,7 @@ type ResponseListValidators struct { BondedValidators []*sm.Validator UnbondingValidators []*sm.Validator } + +type ResponseDumpConsensusState struct { + ConsensusState string +} diff --git a/rpc/core_client/client_methods.go b/rpc/core_client/client_methods.go index 391358615..8b2334974 100644 --- a/rpc/core_client/client_methods.go +++ b/rpc/core_client/client_methods.go @@ -18,6 +18,7 @@ type Client interface { BroadcastTx(tx types.Tx) (*ctypes.ResponseBroadcastTx, error) Call(address []byte, data []byte) (*ctypes.ResponseCall, error) CallCode(code []byte, data []byte) (*ctypes.ResponseCall, error) + DumpConsensusState() (*ctypes.ResponseDumpConsensusState, error) DumpStorage(address []byte) (*ctypes.ResponseDumpStorage, error) GenPrivAccount() (*ctypes.ResponseGenPrivAccount, error) GetAccount(address []byte) (*ctypes.ResponseGetAccount, error) @@ -150,6 +151,36 @@ func (c *ClientHTTP) CallCode(code []byte, data []byte) (*ctypes.ResponseCall, e return response.Result, nil } +func (c *ClientHTTP) DumpConsensusState() (*ctypes.ResponseDumpConsensusState, error) { + values, err := argsToURLValues(nil) + if err != nil { + return nil, err + } + resp, err := http.PostForm(c.addr+reverseFuncMap["DumpConsensusState"], values) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var response struct { + Result *ctypes.ResponseDumpConsensusState `json:"result"` + Error string `json:"error"` + Id string `json:"id"` + JSONRPC string `json:"jsonrpc"` + } + binary.ReadJSON(&response, body, &err) + if err != nil { + return nil, err + } + if response.Error != "" { + return nil, fmt.Errorf(response.Error) + } + return response.Result, nil +} + func (c *ClientHTTP) DumpStorage(address []byte) (*ctypes.ResponseDumpStorage, error) { values, err := argsToURLValues([]string{"address"}, address) if err != nil { @@ -558,6 +589,33 @@ func (c *ClientJSON) CallCode(code []byte, data []byte) (*ctypes.ResponseCall, e return response.Result, nil } +func (c *ClientJSON) DumpConsensusState() (*ctypes.ResponseDumpConsensusState, error) { + request := rpc.RPCRequest{ + JSONRPC: "2.0", + Method: reverseFuncMap["DumpConsensusState"], + Params: []interface{}{}, + Id: 0, + } + body, err := c.RequestResponse(request) + if err != nil { + return nil, err + } + var response struct { + Result *ctypes.ResponseDumpConsensusState `json:"result"` + Error string `json:"error"` + Id string `json:"id"` + JSONRPC string `json:"jsonrpc"` + } + binary.ReadJSON(&response, body, &err) + if err != nil { + return nil, err + } + if response.Error != "" { + return nil, fmt.Errorf(response.Error) + } + return response.Result, nil +} + func (c *ClientJSON) DumpStorage(address []byte) (*ctypes.ResponseDumpStorage, error) { request := rpc.RPCRequest{ JSONRPC: "2.0", diff --git a/rpc/http_server.go b/rpc/http_server.go index 5eaff856e..140fda441 100644 --- a/rpc/http_server.go +++ b/rpc/http_server.go @@ -13,16 +13,19 @@ import ( "github.com/tendermint/tendermint/alert" "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/p2p" ) func StartHTTPServer(listenAddr string, handler http.Handler) { log.Info(Fmt("Starting RPC HTTP server on %s", listenAddr)) go func() { - res := http.ListenAndServe( - listenAddr, + listener := p2p.NewDefaultListener("tcp", listenAddr, false) + netListener := listener.(*p2p.DefaultListener).NetListener() + res := http.Serve( + netListener, RecoverAndLogHandler(handler), ) - log.Crit("RPC HTTPServer stopped", "result", res) + log.Crit("RPC HTTP server stopped", "result", res) }() } @@ -30,7 +33,7 @@ func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) { buf, n, err := new(bytes.Buffer), new(int64), new(error) binary.WriteJSON(res, buf, n, err) if *err != nil { - log.Warn("Failed to write JSON RPCResponse", "error", err) + log.Warn("Failed to write RPC response", "error", err) } w.Header().Set("Content-Type", "application/json") @@ -67,7 +70,7 @@ func RecoverAndLogHandler(handler http.Handler) http.Handler { WriteRPCResponse(rww, res) } else { // For the rest, - log.Error("Panic in HTTP handler", "error", e, "stack", string(debug.Stack())) + log.Error("Panic in RPC HTTP handler", "error", e, "stack", string(debug.Stack())) rww.WriteHeader(http.StatusInternalServerError) WriteRPCResponse(rww, NewRPCResponse(nil, Fmt("Internal Server Error: %v", e))) } @@ -78,7 +81,7 @@ func RecoverAndLogHandler(handler http.Handler) http.Handler { if rww.Status == -1 { rww.Status = 200 } - log.Debug("Served HTTP response", + log.Debug("Served RPC HTTP response", "method", r.Method, "url", r.URL, "status", rww.Status, "duration", durationMS, "remoteAddr", r.RemoteAddr, diff --git a/types/node.go b/types/node.go new file mode 100644 index 000000000..7a3c41a73 --- /dev/null +++ b/types/node.go @@ -0,0 +1,9 @@ +package types + +type NodeInfo struct { + Moniker string + Network string + Host string + P2PPort uint16 + RPCPort uint16 +} diff --git a/types/tx.go b/types/tx.go index a924f3822..ca98367e8 100644 --- a/types/tx.go +++ b/types/tx.go @@ -7,6 +7,7 @@ import ( "github.com/tendermint/tendermint/account" "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/config" ) var ( @@ -133,6 +134,7 @@ type SendTx struct { } func (tx *SendTx) WriteSignBytes(w io.Writer, n *int64, err *error) { + binary.WriteString(config.App().GetString("Network"), w, n, err) binary.WriteUvarint(uint(len(tx.Inputs)), w, n, err) for _, in := range tx.Inputs { in.WriteSignBytes(w, n, err) @@ -158,6 +160,7 @@ type CallTx struct { } func (tx *CallTx) WriteSignBytes(w io.Writer, n *int64, err *error) { + binary.WriteString(config.App().GetString("Network"), w, n, err) tx.Input.WriteSignBytes(w, n, err) binary.WriteByteSlice(tx.Address, w, n, err) binary.WriteUint64(tx.GasLimit, w, n, err) @@ -178,6 +181,7 @@ type BondTx struct { } func (tx *BondTx) WriteSignBytes(w io.Writer, n *int64, err *error) { + binary.WriteString(config.App().GetString("Network"), w, n, err) binary.WriteBinary(tx.PubKey, w, n, err) binary.WriteUvarint(uint(len(tx.Inputs)), w, n, err) for _, in := range tx.Inputs { @@ -202,6 +206,7 @@ type UnbondTx struct { } func (tx *UnbondTx) WriteSignBytes(w io.Writer, n *int64, err *error) { + binary.WriteString(config.App().GetString("Network"), w, n, err) binary.WriteByteSlice(tx.Address, w, n, err) binary.WriteUvarint(tx.Height, w, n, err) } @@ -219,6 +224,7 @@ type RebondTx struct { } func (tx *RebondTx) WriteSignBytes(w io.Writer, n *int64, err *error) { + binary.WriteString(config.App().GetString("Network"), w, n, err) binary.WriteByteSlice(tx.Address, w, n, err) binary.WriteUvarint(tx.Height, w, n, err) } diff --git a/types/vote.go b/types/vote.go index 4bbe3fb73..dc68012ba 100644 --- a/types/vote.go +++ b/types/vote.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/tendermint/account" "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/config" ) var ( @@ -46,6 +47,7 @@ const ( ) func (vote *Vote) WriteSignBytes(w io.Writer, n *int64, err *error) { + binary.WriteString(config.App().GetString("Network"), w, n, err) binary.WriteUvarint(vote.Height, w, n, err) binary.WriteUvarint(vote.Round, w, n, err) binary.WriteByte(vote.Type, w, n, err)