diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 53aa311cc..207e5d279 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -44,7 +44,7 @@ type BlockchainReactor struct { sw *p2p.Switch state *sm.State - proxyAppConn proxy.AppConn // same as consensus.proxyAppConn + proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn store *BlockStore pool *BlockPool fastSync bool @@ -55,7 +55,7 @@ type BlockchainReactor struct { evsw *events.EventSwitch } -func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConn, store *BlockStore, fastSync bool) *BlockchainReactor { +func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor { if state.LastBlockHeight == store.Height()-1 { store.height -= 1 // XXX HACK, make this better } @@ -231,19 +231,22 @@ FOR_LOOP: break SYNC_LOOP } else { bcR.pool.PopRequest() + // TODO: use ApplyBlock instead of Exec/Commit/SetAppHash/Save err := bcR.state.ExecBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader) if err != nil { // TODO This is bad, are we zombie? - PanicQ(Fmt("Failed to process committed block: %v", err)) + PanicQ(Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) + } + // NOTE: we could improve performance if we + // didn't make the app commit to disk every block + // ... but we would need a way to get the hash without it persisting + res := bcR.proxyAppConn.CommitSync() + if res.IsErr() { + // TODO Handle gracefully. + PanicQ(Fmt("Failed to commit block at application: %v", res)) } - /* - err = bcR.proxyAppConn.CommitSync() - if err != nil { - // TODO Handle gracefully. - PanicQ(Fmt("Failed to commit block at application: %v", err)) - } - */ bcR.store.SaveBlock(first, firstParts, second.LastCommit) + bcR.state.AppHash = res.Data bcR.state.Save() } } diff --git a/circle.yml b/circle.yml index 2f6141b2d..de49c473c 100644 --- a/circle.yml +++ b/circle.yml @@ -8,10 +8,6 @@ machine: hosts: circlehost: 127.0.0.1 localhost: 127.0.0.1 - pre: - - curl -sSL https://s3.amazonaws.com/circle-downloads/install-circleci-docker.sh | sudo bash -s -- $DOCKER_VERSION - services: - - docker checkout: post: @@ -23,7 +19,10 @@ checkout: dependencies: override: + - echo $MACH_PREFIX $GOPATH $REPO $DOCKER_VERSION $DOCKER_MACHINE_VERSION + - curl -sSL https://s3.amazonaws.com/circle-downloads/install-circleci-docker.sh | sudo bash -s -- $DOCKER_VERSION - sudo curl -sSL -o /usr/bin/docker-machine https://github.com/docker/machine/releases/download/v$DOCKER_MACHINE_VERSION/docker-machine-linux-x86_64; sudo chmod 0755 /usr/bin/docker-machine + - sudo start docker - go version - docker version - docker-machine version diff --git a/cmd/tendermint/reset_priv_validator.go b/cmd/tendermint/reset_priv_validator.go index 795823c39..2887c10d0 100644 --- a/cmd/tendermint/reset_priv_validator.go +++ b/cmd/tendermint/reset_priv_validator.go @@ -22,10 +22,7 @@ func reset_priv_validator() { privValidatorFile := config.GetString("priv_validator_file") if _, err := os.Stat(privValidatorFile); err == nil { privValidator = types.LoadPrivValidator(privValidatorFile) - privValidator.LastHeight = 0 - privValidator.LastRound = 0 - privValidator.LastStep = 0 - privValidator.Save() + privValidator.Reset() log.Notice("Reset PrivValidator", "file", privValidatorFile) } else { privValidator = types.GenPrivValidator() diff --git a/config/tendermint/config.go b/config/tendermint/config.go index 3731c53bb..4d2323ab3 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -55,7 +55,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("proxy_app", "tcp://127.0.0.1:46658") mapConfig.SetDefault("tmsp", "socket") mapConfig.SetDefault("moniker", "anonymous") - mapConfig.SetDefault("node_laddr", "0.0.0.0:46656") + mapConfig.SetDefault("node_laddr", "tcp://0.0.0.0:46656") mapConfig.SetDefault("seeds", "") // mapConfig.SetDefault("seeds", "goldenalchemist.chaintest.net:46656") mapConfig.SetDefault("fast_sync", true) @@ -65,11 +65,12 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("db_backend", "leveldb") mapConfig.SetDefault("db_dir", rootDir+"/data") mapConfig.SetDefault("log_level", "info") - mapConfig.SetDefault("rpc_laddr", "0.0.0.0:46657") + mapConfig.SetDefault("rpc_laddr", "tcp://0.0.0.0:46657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") mapConfig.SetDefault("cswal", rootDir+"/data/cswal") mapConfig.SetDefault("cswal_light", false) + mapConfig.SetDefault("filter_peers", false) mapConfig.SetDefault("block_size", 10000) mapConfig.SetDefault("disable_data_hash", false) @@ -92,12 +93,12 @@ var defaultConfigTmpl = `# This is a TOML config file. proxy_app = "tcp://127.0.0.1:46658" moniker = "__MONIKER__" -node_laddr = "0.0.0.0:46656" +node_laddr = "tcp://0.0.0.0:46656" seeds = "" fast_sync = true db_backend = "leveldb" log_level = "notice" -rpc_laddr = "0.0.0.0:46657" +rpc_laddr = "tcp://0.0.0.0:46657" ` func defaultConfig(moniker string) (defaultConfig string) { diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index c53f8bfbb..120079858 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -70,7 +70,7 @@ func ResetConfig(localPath string) cfg.Config { mapConfig.SetDefault("proxy_app", "dummy") mapConfig.SetDefault("tmsp", "socket") mapConfig.SetDefault("moniker", "anonymous") - mapConfig.SetDefault("node_laddr", "0.0.0.0:36656") + mapConfig.SetDefault("node_laddr", "tcp://0.0.0.0:36656") mapConfig.SetDefault("fast_sync", false) mapConfig.SetDefault("skip_upnp", true) mapConfig.SetDefault("addrbook_file", rootDir+"/addrbook.json") @@ -78,21 +78,22 @@ func ResetConfig(localPath string) cfg.Config { mapConfig.SetDefault("db_backend", "memdb") mapConfig.SetDefault("db_dir", rootDir+"/data") mapConfig.SetDefault("log_level", "debug") - mapConfig.SetDefault("rpc_laddr", "0.0.0.0:36657") + mapConfig.SetDefault("rpc_laddr", "tcp://0.0.0.0:36657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") mapConfig.SetDefault("cswal", rootDir+"/data/cswal") mapConfig.SetDefault("cswal_light", false) + mapConfig.SetDefault("filter_peers", false) mapConfig.SetDefault("block_size", 10000) mapConfig.SetDefault("disable_data_hash", false) - mapConfig.SetDefault("timeout_propose", 3000) - mapConfig.SetDefault("timeout_propose_delta", 1000) - mapConfig.SetDefault("timeout_prevote", 2000) - mapConfig.SetDefault("timeout_prevote_delta", 1000) - mapConfig.SetDefault("timeout_precommit", 2000) - mapConfig.SetDefault("timeout_precommit_delta", 1000) - mapConfig.SetDefault("timeout_commit", 1000) + mapConfig.SetDefault("timeout_propose", 2000) + mapConfig.SetDefault("timeout_propose_delta", 500) + mapConfig.SetDefault("timeout_prevote", 1000) + mapConfig.SetDefault("timeout_prevote_delta", 500) + mapConfig.SetDefault("timeout_precommit", 1000) + mapConfig.SetDefault("timeout_precommit_delta", 500) + mapConfig.SetDefault("timeout_commit", 100) mapConfig.SetDefault("mempool_recheck", true) mapConfig.SetDefault("mempool_recheck_empty", true) mapConfig.SetDefault("mempool_broadcast", true) @@ -105,12 +106,12 @@ var defaultConfigTmpl = `# This is a TOML config file. proxy_app = "dummy" moniker = "__MONIKER__" -node_laddr = "0.0.0.0:36656" +node_laddr = "tcp://0.0.0.0:36656" seeds = "" fast_sync = false db_backend = "memdb" log_level = "debug" -rpc_laddr = "0.0.0.0:36657" +rpc_laddr = "tcp://0.0.0.0:36657" ` func defaultConfig(moniker string) (defaultConfig string) { diff --git a/consensus/README.md b/consensus/README.md index 46d33032f..182e30bfa 100644 --- a/consensus/README.md +++ b/consensus/README.md @@ -1,4 +1,18 @@ -The core consensus algorithm. +# The core consensus algorithm. * state.go - The state machine as detailed in the whitepaper * reactor.go - A reactor that connects the state machine to the gossip network + +# Go-routine summary + +The reactor runs 2 go-routines for each added peer: gossipDataRoutine and gossipVotesRoutine. + +The consensus state runs two persistent go-routines: timeoutRoutine and receiveRoutine. +Go-routines are also started to trigger timeouts and to avoid blocking when the internalMsgQueue is really backed up. + +# Replay/WAL + +A write-ahead log is used to record all messages processed by the receiveRoutine, +which amounts to all inputs to the consensus state machine: +messages from peers, messages from ourselves, and timeouts. +They can be played back deterministically at startup or using the replay console. diff --git a/consensus/common_test.go b/consensus/common_test.go index 693edbca1..3e05d2564 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -316,8 +316,9 @@ func fixedConsensusState() *ConsensusState { state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) privValidatorFile := config.GetString("priv_validator_file") privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - return newConsensusState(state, privValidator, counter.NewCounterApplication(true)) - + privValidator.Reset() + cs := newConsensusState(state, privValidator, counter.NewCounterApplication(true)) + return cs } func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState { diff --git a/consensus/reactor.go b/consensus/reactor.go index 98f6510dd..49dc73c2c 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -140,6 +140,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // Messages affect either a peer state or the consensus state. // Peer state updates can happen in parallel, but processing of // proposals, block parts, and votes are ordered by the receiveRoutine +// NOTE: blocks on consensus state for proposals, block parts, and votes func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { if !conR.IsRunning() { log.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes) diff --git a/consensus/replay.go b/consensus/replay.go index ec85be095..9bb0fb055 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -19,6 +19,8 @@ import ( ) // unmarshal and apply a single message to the consensus state +// as if it were received in receiveRoutine +// NOTE: receiveRoutine should not be running func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error { var err error var msg ConsensusLogMessage @@ -31,7 +33,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte // for logging switch m := msg.Msg.(type) { case types.EventDataRoundState: - log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step) + log.Notice("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step) // these are playback checks ticker := time.After(time.Second * 2) if newStepCh != nil { @@ -53,41 +55,37 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte switch msg := m.Msg.(type) { case *ProposalMessage: p := msg.Proposal - log.Notice("Proposal", "height", p.Height, "round", p.Round, "header", + log.Notice("Replay: Proposal", "height", p.Height, "round", p.Round, "header", p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey) case *BlockPartMessage: - log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) + log.Notice("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) case *VoteMessage: v := msg.Vote - log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type, + log.Notice("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type, "hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey) } - // internal or from peer - if m.PeerKey == "" { - cs.internalMsgQueue <- m - } else { - cs.peerMsgQueue <- m - } + + cs.handleMsg(m, cs.RoundState) case timeoutInfo: - log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) - cs.tockChan <- m + log.Notice("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) + cs.handleTimeout(m, cs.RoundState) default: - return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) + return fmt.Errorf("Replay: Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) } return nil } -// replay only those messages since the last block +// replay only those messages since the last block. +// timeoutRoutine should run concurrently to read off tickChan func (cs *ConsensusState) catchupReplay(height int) error { - if cs.wal == nil { - log.Warn("consensus msg log is nil") - return nil - } - if !cs.wal.exists { - // new wal, nothing to catchup on + if !cs.wal.Exists() { return nil } + // set replayMode + cs.replayMode = true + defer func() { cs.replayMode = false }() + // starting from end of file, // read messages until a new height is found nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { @@ -142,7 +140,7 @@ func (cs *ConsensusState) catchupReplay(height int) error { return err } } - log.Info("Done catchup replay") + log.Notice("Done catchup replay") return nil } @@ -255,8 +253,9 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { } func (cs *ConsensusState) startForReplay() { + // don't want to start full cs cs.BaseService.OnStart() - go cs.receiveRoutine(0) + // since we replay tocks we just ignore ticks go func() { for { diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 46e862ed2..654cec9c2 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -1,12 +1,16 @@ package consensus import ( + "fmt" "io/ioutil" "os" + "strings" "testing" "time" . "github.com/tendermint/go-common" + "github.com/tendermint/go-events" + "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" ) @@ -56,51 +60,144 @@ var testLog = `{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254 {"time":"2016-04-03T11:23:54.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"39147DA595F08B73CF8C899967C8403B5872FD9042FFA4E239159E0B6C5D9665C9CA81D766EACA2AE658872F94C2FCD1E34BF51859CD5B274DA8512BACE4B50D"}}],"peer_key":""}]} ` -func TestReplayCatchup(t *testing.T) { +// map lines in the above wal to privVal step +var mapPrivValStep = map[int]int8{ + 0: 0, + 1: 0, + 2: 1, + 3: 1, + 4: 1, + 5: 2, + 6: 2, + 7: 3, +} + +func writeWAL(log string) string { + fmt.Println("writing", log) // write the needed wal to file f, err := ioutil.TempFile(os.TempDir(), "replay_test_") if err != nil { panic(err) } - name := f.Name() - _, err = f.WriteString(testLog) + + _, err = f.WriteString(log) if err != nil { panic(err) } + name := f.Name() f.Close() + return name +} + +func waitForBlock(newBlockCh chan interface{}) { + after := time.After(time.Second * 10) + select { + case <-newBlockCh: + case <-after: + panic("Timed out waiting for new block") + } +} + +func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh chan interface{}) { + cs.config.Set("cswal", fileName) + cs.Start() + // Wait to make a new block. + // This is just a signal that we haven't halted; its not something contained in the WAL itself. + // Assuming the consensus state is running, replay of any WAL, including the empty one, + // should eventually be followed by a new block, or else something is wrong + waitForBlock(newBlockCh) + cs.Stop() +} + +func setupReplayTest(nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) { + fmt.Println("-------------------------------------") + log.Notice(Fmt("Starting replay test of %d lines of WAL (crash before write)", nLines)) + + lineStep := nLines + if crashAfter { + lineStep -= 1 + } + + split := strings.Split(testLog, "\n") + lastMsg := split[nLines] + + // we write those lines up to (not including) one with the signature + fileName := writeWAL(strings.Join(split[:nLines], "\n") + "\n") cs := fixedConsensusState() - // we've already precommitted on the first block - // without replay catchup we would be halted here forever + // set the last step according to when we crashed vs the wal cs.privValidator.LastHeight = 1 // first block - cs.privValidator.LastStep = 3 // precommit + cs.privValidator.LastStep = mapPrivValStep[lineStep] - newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 0) + fmt.Println("LAST STEP", cs.privValidator.LastStep) - // start timeout and receive routines - cs.startRoutines(0) + newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) - // open wal and run catchup messages - openWAL(t, cs, name) - if err := cs.catchupReplay(cs.Height); err != nil { - panic(Fmt("Error on catchup replay %v", err)) - } + return cs, newBlockCh, lastMsg, fileName +} - after := time.After(time.Second * 15) - select { - case <-newBlockCh: - case <-after: - panic("Timed out waiting for new block") +//----------------------------------------------- +// Test the log at every iteration, and set the privVal last step +// as if the log was written after signing, before the crash + +func TestReplayCrashAfterWrite(t *testing.T) { + split := strings.Split(testLog, "\n") + for i := 0; i < len(split)-1; i++ { + cs, newBlockCh, _, f := setupReplayTest(i+1, true) + runReplayTest(t, cs, f, newBlockCh) } } -func openWAL(t *testing.T, cs *ConsensusState, file string) { - // open the wal - wal, err := NewWAL(file, config.GetBool("cswal_light")) +//----------------------------------------------- +// Test the log as if we crashed after signing but before writing. +// This relies on privValidator.LastSignature being set + +func TestReplayCrashBeforeWritePropose(t *testing.T) { + cs, newBlockCh, proposalMsg, f := setupReplayTest(2, false) // propose + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(proposalMsg), &err) + proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) if err != nil { - panic(err) + t.Fatalf("Error reading json data: %v", err) } - wal.exists = true - cs.wal = wal + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal) + cs.privValidator.LastSignature = proposal.Proposal.Signature + runReplayTest(t, cs, f, newBlockCh) +} + +func TestReplayCrashBeforeWritePrevote(t *testing.T) { + cs, newBlockCh, voteMsg, f := setupReplayTest(5, false) // prevote + cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) { + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(voteMsg), &err) + vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) + cs.privValidator.LastSignature = vote.Vote.Signature + }) + runReplayTest(t, cs, f, newBlockCh) +} + +func TestReplayCrashBeforeWritePrecommit(t *testing.T) { + cs, newBlockCh, voteMsg, f := setupReplayTest(7, false) // precommit + cs.evsw.AddListenerForEvent("tester", types.EventStringPolka(), func(data events.EventData) { + // Set LastSig + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, []byte(voteMsg), &err) + vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) + if err != nil { + t.Fatalf("Error reading json data: %v", err) + } + cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) + cs.privValidator.LastSignature = vote.Vote.Signature + }) + runReplayTest(t, cs, f, newBlockCh) } diff --git a/consensus/state.go b/consensus/state.go index f89de3921..51fd8864e 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -215,7 +215,7 @@ type ConsensusState struct { QuitService config cfg.Config - proxyAppConn proxy.AppConn + proxyAppConn proxy.AppConnConsensus blockStore *bc.BlockStore mempool *mempl.Mempool privValidator *types.PrivValidator @@ -233,12 +233,13 @@ type ConsensusState struct { evsw *events.EventSwitch - wal *WAL + wal *WAL + replayMode bool // so we don't log signing errors during replay nSteps int // used for testing to limit the number of transitions the state makes } -func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConn, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { +func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { cs := &ConsensusState{ config: config, proxyAppConn: proxyAppConn, @@ -298,8 +299,17 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { func (cs *ConsensusState) OnStart() error { cs.QuitService.OnStart() - // start timeout and receive routines - cs.startRoutines(0) + err := cs.OpenWAL(cs.config.GetString("cswal")) + if err != nil { + return err + } + + // we need the timeoutRoutine for replay so + // we don't block on the tick chan. + // NOTE: we will get a build up of garbage go routines + // firing on the tockChan until the receiveRoutine is started + // to deal with them (by that point, at most one will be valid) + go cs.timeoutRoutine() // we may have lost some votes if the process crashed // reload from consensus log to catchup @@ -308,8 +318,12 @@ func (cs *ConsensusState) OnStart() error { // let's go for it anyways, maybe we're fine } + // now start the receiveRoutine + go cs.receiveRoutine(0) + // schedule the first round! - cs.scheduleRound0(cs.Height) + // use GetRoundState so we don't race the receiveRoutine for access + cs.scheduleRound0(cs.GetRoundState()) return nil } @@ -407,13 +421,13 @@ func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) { } // enterNewRound(height, 0) at cs.StartTime. -func (cs *ConsensusState) scheduleRound0(height int) { +func (cs *ConsensusState) scheduleRound0(rs *RoundState) { //log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime) - sleepDuration := cs.StartTime.Sub(time.Now()) + sleepDuration := rs.StartTime.Sub(time.Now()) if sleepDuration < time.Duration(0) { sleepDuration = time.Duration(0) } - cs.scheduleTimeout(sleepDuration, height, 0, RoundStepNewHeight) + cs.scheduleTimeout(sleepDuration, rs.Height, 0, RoundStepNewHeight) } // Attempt to schedule a timeout by sending timeoutInfo on the tickChan. @@ -432,7 +446,7 @@ func (cs *ConsensusState) sendInternalMessage(mi msgInfo) { // be processed out of order. // TODO: use CList here for strict determinism and // attempt push to internalMsgQueue in receiveRoutine - log.Debug("Internal msg queue is full. Using a go-routine") + log.Warn("Internal msg queue is full. Using a go-routine") go func() { cs.internalMsgQueue <- mi }() } } @@ -843,7 +857,9 @@ func (cs *ConsensusState) decideProposal(height, round int) { log.Info("Signed proposal", "height", height, "round", round, "proposal", proposal) log.Debug(Fmt("Signed proposal block: %v", block)) } else { - log.Warn("enterPropose: Error signing proposal", "height", height, "round", round, "error", err) + if !cs.replayMode { + log.Warn("enterPropose: Error signing proposal", "height", height, "round", round, "error", err) + } } } @@ -1254,7 +1270,7 @@ func (cs *ConsensusState) finalizeCommit(height int) { // cs.StartTime is already set. // Schedule Round0 to start soon. - cs.scheduleRound0(height + 1) + cs.scheduleRound0(&cs.RoundState) // By here, // * cs.Height has been increment to height+1 @@ -1270,9 +1286,6 @@ func (cs *ConsensusState) commitStateUpdateMempool(s *sm.State, block *types.Blo cs.mempool.Lock() defer cs.mempool.Unlock() - // flush out any CheckTx that have already started - cs.proxyAppConn.FlushSync() - // Commit block, get hash back res := cs.proxyAppConn.CommitSync() if res.IsErr() { @@ -1502,8 +1515,9 @@ func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSet return vote, err } -// signs the vote, publishes on internalMsgQueue +// sign the vote and publish on internalMsgQueue func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote { + if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.Address) { return nil } @@ -1515,7 +1529,9 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part log.Info("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) return vote } else { - log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) + if !cs.replayMode { + log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) + } return nil } } diff --git a/consensus/state_test.go b/consensus/state_test.go index 9a8d60905..d890c4e93 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -236,7 +236,7 @@ func TestFullRound1(t *testing.T) { cs, vss := randConsensusState(1) height, round := cs.Height, cs.Round - voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1) + voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 0) propCh := subscribeToEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), 1) newRoundCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewRound(), 1) @@ -249,6 +249,8 @@ func TestFullRound1(t *testing.T) { propBlockHash := re.(types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash() <-voteCh // wait for prevote + // NOTE: voteChan cap of 0 ensures we can complete this + // before consensus can move to the next height (and cause a race condition) validatePrevote(t, cs, round, vss[0], propBlockHash) <-voteCh // wait for precommit diff --git a/consensus/wal.go b/consensus/wal.go index 5b4747a6f..fb5b80568 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -60,6 +60,14 @@ func NewWAL(file string, light bool) (*WAL, error) { }, nil } +func (wal *WAL) Exists() bool { + if wal == nil { + log.Warn("consensus msg log is nil") + return false + } + return wal.exists +} + // called in newStep and for each pass in receiveRoutine func (wal *WAL) Save(clm ConsensusLogMessageInterface) { if wal != nil { diff --git a/glide.lock b/glide.lock index ce3018940..722c82a71 100644 --- a/glide.lock +++ b/glide.lock @@ -1,64 +1,66 @@ hash: d87a1fe0061d41c1e6ec78d405d54ae321e75f4bff22b38d19d3255bbd17f21e -updated: 2016-06-11T18:38:47.019992204-07:00 +updated: 2016-09-10T18:02:24.023038691-04:00 imports: - name: github.com/btcsuite/btcd - version: ff4ada0b0e1ebffa3f9c15cadc96ab0d08a11034 + version: 2ef82e7db35dc8c499fa9091d768dc99bbaff893 subpackages: - btcec - name: github.com/btcsuite/fastsha256 - version: 302ad4db268b46f9ebda3078f6f7397f96047735 + version: 637e656429416087660c84436a2a035d69d54e2e - name: github.com/BurntSushi/toml - version: f0aeabca5a127c4078abb8c8d64298b147264b55 + version: 99064174e013895bbd9b025c31100bd1d9b590ca - name: github.com/go-stack/stack version: 100eb0c0a9c5b306ca2fb4f165df21d80ada4b82 - name: github.com/gogo/protobuf - version: 318371cbef6bab80e8d1c69b470fffa79eebfb54 + version: a11c89fbb0ad4acfa8abc4a4d5f7e27c477169b1 subpackages: - proto - name: github.com/golang/protobuf - version: 8616e8ee5e20a1704615e6c8d7afcdac06087a67 + version: 1f49d83d9aa00e6ce4fc8258c71cc7786aec968a subpackages: - proto - name: github.com/golang/snappy version: d9eb7a3d35ec988b8585d4a0068e462c27d28380 - name: github.com/gorilla/websocket - version: a68708917c6a4f06314ab4e52493cc61359c9d42 + version: a69d25be2fe2923a97c2af6849b2f52426f68fc0 - name: github.com/mattn/go-colorable - version: 9056b7a9f2d1f2d96498d6d146acd1f9d5ed3d59 + version: ed8eb9e318d7a84ce5915b495b7d35e0cfe7b5a8 - name: github.com/mattn/go-isatty - version: 56b76bdf51f7708750eac80fa38b952bb9f32639 + version: 66b8e73f3f5cda9f96b69efd03dd3d7fc4a5cdb8 - name: github.com/spf13/pflag - version: 367864438f1b1a3c7db4da06a2f55b144e6784e0 + version: 6fd2ff4ff8dfcdf5556fbdc0ac0284408274b1a7 - name: github.com/syndtr/goleveldb - version: fa5b5c78794bc5c18f330361059f871ae8c2b9d6 + version: 6ae1797c0b42b9323fc27ff7dcf568df88f2f33d subpackages: - leveldb - - leveldb/errors - - leveldb/opt - leveldb/cache - leveldb/comparer + - leveldb/errors - leveldb/filter - leveldb/iterator - leveldb/journal - leveldb/memdb + - leveldb/opt - leveldb/storage - leveldb/table - leveldb/util - name: github.com/tendermint/ed25519 version: 1f52c6f8b8a5c7908aff4497c186af344b428925 subpackages: - - extra25519 - edwards25519 + - extra25519 - name: github.com/tendermint/flowcontrol version: 84d9671090430e8ec80e35b339907e0579b999eb - name: github.com/tendermint/go-clist version: 3baa390bbaf7634251c42ad69a8682e7e3990552 - name: github.com/tendermint/go-common - version: dee6622bf7f811d3ba8638a3f5ffaf8d679aa9d9 + version: 47e06734f6ee488cc2e61550a38642025e1d4227 + subpackages: + - test - name: github.com/tendermint/go-config version: e64b424499acd0eb9856b88e10c0dff41628c0d6 - name: github.com/tendermint/go-crypto - version: 41cfb7b677f4e16cdfd22b6ce0946c89919fbc7b + version: 4b11d62bdb324027ea01554e5767b71174680ba0 - name: github.com/tendermint/go-db version: 31fdd21c7eaeed53e0ea7ca597fb1e960e2988a5 - name: github.com/tendermint/go-events @@ -68,11 +70,11 @@ imports: - name: github.com/tendermint/go-merkle version: 05042c6ab9cad51d12e4cecf717ae68e3b1409a8 - name: github.com/tendermint/go-p2p - version: 929cf433b9c8e987af5f7f3ca3ce717e1e3eda53 + version: f508f3f20b5bb36f03d3bc83647b7a92425139d1 subpackages: - upnp - name: github.com/tendermint/go-rpc - version: dea910cd3e71bbfaf1973fd7ba295f0ee515a25f + version: 479510be0e80dd9e5d6b1f941adad168df0af85f subpackages: - client - server @@ -84,38 +86,40 @@ imports: subpackages: - term - name: github.com/tendermint/tmsp - version: ba11348508939e9d273cdc1cc476c5c611e14e66 + version: ead192adbbbf85ac581cf775b18ae70d59f86457 subpackages: - client + - example/counter - example/dummy - example/nil + - server - types - name: golang.org/x/crypto - version: 77f4136a99ffb5ecdbdd0226bd5cb146cf56bc0e + version: aa2481cbfe81d911eb62b642b7a6b5ec58bbea71 subpackages: - - ripemd160 + - curve25519 - nacl/box - nacl/secretbox - openpgp/armor - - curve25519 - - salsa20/salsa - - poly1305 - openpgp/errors + - poly1305 + - ripemd160 + - salsa20/salsa - name: golang.org/x/net - version: 3f122ce3dbbe488b7e6a8bdb26f41edec852a40b + version: cfe3c2a7525b50c3d707256e371c90938cfef98a subpackages: - context - http2 - - trace - http2/hpack - - lex/httplex - internal/timeseries + - lex/httplex + - trace - name: golang.org/x/sys - version: 7f918dd405547ecb864d14a8ecbbfe205b5f930f + version: 30de6d19a3bd89a5f38ae4028e23aaa5582648af subpackages: - unix - name: google.golang.org/grpc - version: daeb9cc0f2607997cce611a1458e71b981ce5986 + version: 28707e14b1d2b2f5da81474dea2790d71e526987 subpackages: - codes - credentials @@ -123,6 +127,6 @@ imports: - internal - metadata - naming - - transport - peer -devImports: [] + - transport +testImports: [] diff --git a/mempool/mempool.go b/mempool/mempool.go index 6cd2227c2..34df870f8 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -49,7 +49,7 @@ type Mempool struct { config cfg.Config proxyMtx sync.Mutex - proxyAppConn proxy.AppConn + proxyAppConn proxy.AppConnMempool txs *clist.CList // concurrent linked-list of good txs counter int64 // simple incrementing counter height int // the last block Update()'d to @@ -63,7 +63,7 @@ type Mempool struct { cacheList *list.List // to remove oldest tx when cache gets too big } -func NewMempool(config cfg.Config, proxyAppConn proxy.AppConn) *Mempool { +func NewMempool(config cfg.Config, proxyAppConn proxy.AppConnMempool) *Mempool { mempool := &Mempool{ config: config, proxyAppConn: proxyAppConn, diff --git a/node/node.go b/node/node.go index 231b6b650..72e08b8b8 100644 --- a/node/node.go +++ b/node/node.go @@ -6,7 +6,6 @@ import ( "net" "net/http" "strings" - "sync" "time" . "github.com/tendermint/go-common" @@ -26,9 +25,6 @@ import ( sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" - tmspcli "github.com/tendermint/tmsp/client" - "github.com/tendermint/tmsp/example/dummy" - "github.com/tendermint/tmsp/example/nil" ) import _ "net/http/pprof" @@ -45,9 +41,17 @@ type Node struct { privValidator *types.PrivValidator genesisDoc *types.GenesisDoc privKey crypto.PrivKeyEd25519 + proxyApp proxy.AppConns } -func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp func(proxyAddr, transport string, appHash []byte) proxy.AppConn) *Node { +func NewNodeDefault(config cfg.Config) *Node { + // Get PrivValidator + privValidatorFile := config.GetString("priv_validator_file") + privValidator := types.LoadOrGenPrivValidator(privValidatorFile) + return NewNode(config, privValidator, proxy.DefaultClientCreator(config)) +} + +func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreator proxy.ClientCreator) *Node { EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here @@ -61,12 +65,9 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp // Get State state := getState(config, stateDB) - // Create two proxyAppConn connections, - // one for the consensus and one for the mempool. - proxyAddr := config.GetString("proxy_app") - transport := config.GetString("tmsp") - proxyAppConnMempool := getProxyApp(proxyAddr, transport, state.AppHash) - proxyAppConnConsensus := getProxyApp(proxyAddr, transport, state.AppHash) + // Create the proxyApp, which houses three connections: + // query, consensus, and mempool + proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore) // add the chainid and number of validators to the global config config.Set("chain_id", state.ChainID) @@ -93,14 +94,14 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp } // Make BlockchainReactor - bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyAppConnConsensus, blockStore, fastSync) + bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync) // Make MempoolReactor - mempool := mempl.NewMempool(config, proxyAppConnMempool) + mempool := mempl.NewMempool(config, proxyApp.Mempool()) mempoolReactor := mempl.NewMempoolReactor(config, mempool) // Make ConsensusReactor - consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool) + consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool) consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, fastSync) if privValidator != nil { consensusReactor.SetPrivValidator(privValidator) @@ -118,6 +119,27 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp sw.AddReactor("BLOCKCHAIN", bcReactor) sw.AddReactor("CONSENSUS", consensusReactor) + // filter peers by addr or pubkey with a tmsp query. + // if the query return code is OK, add peer + // XXX: query format subject to change + if config.GetBool("filter_peers") { + // NOTE: addr is ip:port + sw.SetAddrFilter(func(addr net.Addr) error { + res := proxyApp.Query().QuerySync([]byte(Fmt("p2p/filter/addr/%s", addr.String()))) + if res.IsOK() { + return nil + } + return res + }) + sw.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error { + res := proxyApp.Query().QuerySync([]byte(Fmt("p2p/filter/pubkey/%X", pubkey.Bytes()))) + if res.IsOK() { + return nil + } + return res + }) + } + // add the event switch to all services // they should all satisfy events.Eventable SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor) @@ -125,6 +147,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp // run the profile server profileHost := config.GetString("prof_laddr") if profileHost != "" { + go func() { log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil)) }() @@ -142,6 +165,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp privValidator: privValidator, genesisDoc: state.GenesisDoc, privKey: privKey, + proxyApp: proxyApp, } } @@ -185,6 +209,7 @@ func (n *Node) StartRPC() ([]net.Listener, error) { rpccore.SetSwitch(n.sw) rpccore.SetPrivValidator(n.privValidator) rpccore.SetGenesisDoc(n.genesisDoc) + rpccore.SetProxyAppQuery(n.proxyApp.Query()) listenAddrs := strings.Split(n.config.GetString("rpc_laddr"), ",") @@ -270,40 +295,6 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255 return nodeInfo } -// Get a connection to the proxyAppConn addr. -// Check the current hash, and panic if it doesn't match. -func GetProxyApp(addr, transport string, hash []byte) (proxyAppConn proxy.AppConn) { - // use local app (for testing) - switch addr { - case "nilapp": - app := nilapp.NewNilApplication() - mtx := new(sync.Mutex) - proxyAppConn = tmspcli.NewLocalClient(mtx, app) - case "dummy": - app := dummy.NewDummyApplication() - mtx := new(sync.Mutex) - proxyAppConn = tmspcli.NewLocalClient(mtx, app) - default: - // Run forever in a loop - remoteApp, err := proxy.NewRemoteAppConn(addr, transport) - if err != nil { - Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) - } - proxyAppConn = remoteApp - } - - // Check the hash - res := proxyAppConn.CommitSync() - if res.IsErr() { - PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", res)) - } - if !bytes.Equal(hash, res.Data) { - log.Warn(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, res.Data)) - } - - return proxyAppConn -} - // Load the most recent state from "state" db, // or create a new one (and save) from genesis. func getState(config cfg.Config, stateDB dbm.DB) *sm.State { @@ -317,9 +308,12 @@ func getState(config cfg.Config, stateDB dbm.DB) *sm.State { //------------------------------------------------------------------------------ -// Users wishing to use an external signer for their validators +// Users wishing to: +// * use an external signer for their validators +// * supply an in-proc tmsp app // should fork tendermint/tendermint and implement RunNode to -// load their custom priv validator and call NewNode(privVal, getProxyFunc) +// call NewNode with their custom priv validator and/or custom +// proxy.ClientCreator interface func RunNode(config cfg.Config) { // Wait until the genesis doc becomes available genDocFile := config.GetString("genesis_file") @@ -342,13 +336,11 @@ func RunNode(config cfg.Config) { } } - // Get PrivValidator - privValidatorFile := config.GetString("priv_validator_file") - privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - // Create & start node - n := NewNode(config, privValidator, GetProxyApp) - l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp")) + n := NewNodeDefault(config) + + protocol, address := ProtocolAndAddress(config.GetString("node_laddr")) + l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp")) n.AddListener(l) err := n.Start() if err != nil { @@ -400,10 +392,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { // Create two proxyAppConn connections, // one for the consensus and one for the mempool. - proxyAddr := config.GetString("proxy_app") - transport := config.GetString("tmsp") - proxyAppConnMempool := GetProxyApp(proxyAddr, transport, state.AppHash) - proxyAppConnConsensus := GetProxyApp(proxyAddr, transport, state.AppHash) + proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), state, blockStore) // add the chainid to the global config config.Set("chain_id", state.ChainID) @@ -415,9 +404,9 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { Exit(Fmt("Failed to start event switch: %v", err)) } - mempool := mempl.NewMempool(config, proxyAppConnMempool) + mempool := mempl.NewMempool(config, proxyApp.Mempool()) - consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool) + consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool) consensusState.SetEventSwitch(eventSwitch) return consensusState } @@ -448,3 +437,13 @@ func RunReplay(config cfg.Config) { } log.Notice("Replay run successfully") } + +// Defaults to tcp +func ProtocolAndAddress(listenAddr string) (string, string) { + protocol, address := "tcp", listenAddr + parts := strings.SplitN(address, "://", 2) + if len(parts) == 2 { + protocol, address = parts[0], parts[1] + } + return protocol, address +} diff --git a/node/node_test.go b/node/node_test.go index 8cb81f159..880a2dab7 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -6,19 +6,15 @@ import ( "github.com/tendermint/go-p2p" "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/tendermint/types" ) func TestNodeStartStop(t *testing.T) { config := tendermint_test.ResetConfig("node_node_test") - // Get PrivValidator - privValidatorFile := config.GetString("priv_validator_file") - privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - // Create & start node - n := NewNode(config, privValidator, GetProxyApp) - l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp")) + n := NewNodeDefault(config) + protocol, address := ProtocolAndAddress(config.GetString("node_laddr")) + l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp")) n.AddListener(l) n.Start() log.Notice("Started node", "nodeInfo", n.sw.NodeInfo()) diff --git a/proxy/app_conn.go b/proxy/app_conn.go index f959d7b8c..5ba1bc158 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -2,8 +2,139 @@ package proxy import ( tmspcli "github.com/tendermint/tmsp/client" + "github.com/tendermint/tmsp/types" ) -type AppConn interface { - tmspcli.Client +//---------------------------------------------------------------------------------------- +// Enforce which tmsp msgs can be sent on a connection at the type level + +type AppConnConsensus interface { + SetResponseCallback(tmspcli.Callback) + Error() error + + InitChainSync(validators []*types.Validator) (err error) + + BeginBlockSync(height uint64) (err error) + AppendTxAsync(tx []byte) *tmspcli.ReqRes + EndBlockSync(height uint64) (changedValidators []*types.Validator, err error) + CommitSync() (res types.Result) +} + +type AppConnMempool interface { + SetResponseCallback(tmspcli.Callback) + Error() error + + CheckTxAsync(tx []byte) *tmspcli.ReqRes + + FlushAsync() *tmspcli.ReqRes + FlushSync() error +} + +type AppConnQuery interface { + Error() error + + EchoSync(string) (res types.Result) + InfoSync() (res types.Result) + QuerySync(tx []byte) (res types.Result) + + // SetOptionSync(key string, value string) (res types.Result) +} + +//----------------------------------------------------------------------------------------- +// Implements AppConnConsensus (subset of tmspcli.Client) + +type appConnConsensus struct { + appConn tmspcli.Client +} + +func NewAppConnConsensus(appConn tmspcli.Client) *appConnConsensus { + return &appConnConsensus{ + appConn: appConn, + } +} + +func (app *appConnConsensus) SetResponseCallback(cb tmspcli.Callback) { + app.appConn.SetResponseCallback(cb) +} +func (app *appConnConsensus) Error() error { + return app.appConn.Error() +} +func (app *appConnConsensus) InitChainSync(validators []*types.Validator) (err error) { + return app.appConn.InitChainSync(validators) +} +func (app *appConnConsensus) BeginBlockSync(height uint64) (err error) { + return app.appConn.BeginBlockSync(height) +} +func (app *appConnConsensus) AppendTxAsync(tx []byte) *tmspcli.ReqRes { + return app.appConn.AppendTxAsync(tx) +} + +func (app *appConnConsensus) EndBlockSync(height uint64) (changedValidators []*types.Validator, err error) { + return app.appConn.EndBlockSync(height) +} + +func (app *appConnConsensus) CommitSync() (res types.Result) { + return app.appConn.CommitSync() +} + +//------------------------------------------------ +// Implements AppConnMempool (subset of tmspcli.Client) + +type appConnMempool struct { + appConn tmspcli.Client +} + +func NewAppConnMempool(appConn tmspcli.Client) *appConnMempool { + return &appConnMempool{ + appConn: appConn, + } +} + +func (app *appConnMempool) SetResponseCallback(cb tmspcli.Callback) { + app.appConn.SetResponseCallback(cb) +} + +func (app *appConnMempool) Error() error { + return app.appConn.Error() +} + +func (app *appConnMempool) FlushAsync() *tmspcli.ReqRes { + return app.appConn.FlushAsync() +} + +func (app *appConnMempool) FlushSync() error { + return app.appConn.FlushSync() +} + +func (app *appConnMempool) CheckTxAsync(tx []byte) *tmspcli.ReqRes { + return app.appConn.CheckTxAsync(tx) +} + +//------------------------------------------------ +// Implements AppConnQuery (subset of tmspcli.Client) + +type appConnQuery struct { + appConn tmspcli.Client +} + +func NewAppConnQuery(appConn tmspcli.Client) *appConnQuery { + return &appConnQuery{ + appConn: appConn, + } +} + +func (app *appConnQuery) Error() error { + return app.appConn.Error() +} + +func (app *appConnQuery) EchoSync(msg string) (res types.Result) { + return app.appConn.EchoSync(msg) +} + +func (app *appConnQuery) InfoSync() (res types.Result) { + return app.appConn.InfoSync() +} + +func (app *appConnQuery) QuerySync(tx []byte) (res types.Result) { + return app.appConn.QuerySync(tx) } diff --git a/proxy/remote_app_conn_test.go b/proxy/app_conn_test.go similarity index 58% rename from proxy/remote_app_conn_test.go rename to proxy/app_conn_test.go index 58ad50f19..3b7a3413e 100644 --- a/proxy/remote_app_conn_test.go +++ b/proxy/app_conn_test.go @@ -5,14 +5,47 @@ import ( "testing" . "github.com/tendermint/go-common" + tmspcli "github.com/tendermint/tmsp/client" "github.com/tendermint/tmsp/example/dummy" "github.com/tendermint/tmsp/server" + "github.com/tendermint/tmsp/types" ) +//---------------------------------------- + +type AppConnTest interface { + EchoAsync(string) *tmspcli.ReqRes + FlushSync() error + InfoSync() (res types.Result) +} + +type appConnTest struct { + appConn tmspcli.Client +} + +func NewAppConnTest(appConn tmspcli.Client) AppConnTest { + return &appConnTest{appConn} +} + +func (app *appConnTest) EchoAsync(msg string) *tmspcli.ReqRes { + return app.appConn.EchoAsync(msg) +} + +func (app *appConnTest) FlushSync() error { + return app.appConn.FlushSync() +} + +func (app *appConnTest) InfoSync() types.Result { + return app.appConn.InfoSync() +} + +//---------------------------------------- + var SOCKET = "socket" func TestEcho(t *testing.T) { sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) + clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) // Start server s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) @@ -21,12 +54,12 @@ func TestEcho(t *testing.T) { } defer s.Stop() // Start client - proxy, err := NewRemoteAppConn(sockPath, SOCKET) + cli, err := clientCreator.NewTMSPClient() if err != nil { Exit(err.Error()) - } else { - t.Log("Connected") } + proxy := NewAppConnTest(cli) + t.Log("Connected") for i := 0; i < 1000; i++ { proxy.EchoAsync(Fmt("echo-%v", i)) @@ -37,6 +70,7 @@ func TestEcho(t *testing.T) { func BenchmarkEcho(b *testing.B) { b.StopTimer() // Initialize sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) + clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) // Start server s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) if err != nil { @@ -44,12 +78,12 @@ func BenchmarkEcho(b *testing.B) { } defer s.Stop() // Start client - proxy, err := NewRemoteAppConn(sockPath, SOCKET) + cli, err := clientCreator.NewTMSPClient() if err != nil { Exit(err.Error()) - } else { - b.Log("Connected") } + proxy := NewAppConnTest(cli) + b.Log("Connected") echoString := strings.Repeat(" ", 200) b.StartTimer() // Start benchmarking tests @@ -65,6 +99,7 @@ func BenchmarkEcho(b *testing.B) { func TestInfo(t *testing.T) { sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) + clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) // Start server s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) if err != nil { @@ -72,12 +107,13 @@ func TestInfo(t *testing.T) { } defer s.Stop() // Start client - proxy, err := NewRemoteAppConn(sockPath, SOCKET) + cli, err := clientCreator.NewTMSPClient() if err != nil { Exit(err.Error()) - } else { - t.Log("Connected") } + proxy := NewAppConnTest(cli) + t.Log("Connected") + res := proxy.InfoSync() if res.IsErr() { t.Errorf("Unexpected error: %v", err) diff --git a/proxy/client.go b/proxy/client.go new file mode 100644 index 000000000..ea3218d32 --- /dev/null +++ b/proxy/client.go @@ -0,0 +1,81 @@ +package proxy + +import ( + "fmt" + "sync" + + cfg "github.com/tendermint/go-config" + tmspcli "github.com/tendermint/tmsp/client" + "github.com/tendermint/tmsp/example/dummy" + nilapp "github.com/tendermint/tmsp/example/nil" + "github.com/tendermint/tmsp/types" +) + +// NewTMSPClient returns newly connected client +type ClientCreator interface { + NewTMSPClient() (tmspcli.Client, error) +} + +//---------------------------------------------------- +// local proxy uses a mutex on an in-proc app + +type localClientCreator struct { + mtx *sync.Mutex + app types.Application +} + +func NewLocalClientCreator(app types.Application) ClientCreator { + return &localClientCreator{ + mtx: new(sync.Mutex), + app: app, + } +} + +func (l *localClientCreator) NewTMSPClient() (tmspcli.Client, error) { + return tmspcli.NewLocalClient(l.mtx, l.app), nil +} + +//--------------------------------------------------------------- +// remote proxy opens new connections to an external app process + +type remoteClientCreator struct { + addr string + transport string + mustConnect bool +} + +func NewRemoteClientCreator(addr, transport string, mustConnect bool) ClientCreator { + return &remoteClientCreator{ + addr: addr, + transport: transport, + mustConnect: mustConnect, + } +} + +func (r *remoteClientCreator) NewTMSPClient() (tmspcli.Client, error) { + // Run forever in a loop + fmt.Println("ADDR", r.addr, r.transport) + remoteApp, err := tmspcli.NewClient(r.addr, r.transport, r.mustConnect) + if err != nil { + return nil, fmt.Errorf("Failed to connect to proxy: %v", err) + } + return remoteApp, nil +} + +//----------------------------------------------------------------- +// default + +func DefaultClientCreator(config cfg.Config) ClientCreator { + addr := config.GetString("proxy_app") + transport := config.GetString("tmsp") + + switch addr { + case "dummy": + return NewLocalClientCreator(dummy.NewDummyApplication()) + case "nilapp": + return NewLocalClientCreator(nilapp.NewNilApplication()) + default: + mustConnect := true + return NewRemoteClientCreator(addr, transport, mustConnect) + } +} diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go new file mode 100644 index 000000000..fe009e1d6 --- /dev/null +++ b/proxy/multi_app_conn.go @@ -0,0 +1,94 @@ +package proxy + +import ( + . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" +) + +// Tendermint's interface to the application consists of multiple connections +type AppConns interface { + Mempool() AppConnMempool + Consensus() AppConnConsensus + Query() AppConnQuery +} + +func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) AppConns { + return NewMultiAppConn(config, clientCreator, state, blockStore) +} + +// a multiAppConn is made of a few appConns (mempool, consensus, query) +// and manages their underlying tmsp clients, ensuring they reboot together +type multiAppConn struct { + QuitService + + config cfg.Config + + state State + blockStore BlockStore + + mempoolConn *appConnMempool + consensusConn *appConnConsensus + queryConn *appConnQuery + + clientCreator ClientCreator +} + +// Make all necessary tmsp connections to the application +func NewMultiAppConn(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) *multiAppConn { + multiAppConn := &multiAppConn{ + config: config, + state: state, + blockStore: blockStore, + clientCreator: clientCreator, + } + multiAppConn.QuitService = *NewQuitService(log, "multiAppConn", multiAppConn) + multiAppConn.Start() + return multiAppConn +} + +// Returns the mempool connection +func (app *multiAppConn) Mempool() AppConnMempool { + return app.mempoolConn +} + +// Returns the consensus Connection +func (app *multiAppConn) Consensus() AppConnConsensus { + return app.consensusConn +} + +func (app *multiAppConn) Query() AppConnQuery { + return app.queryConn +} + +func (app *multiAppConn) OnStart() error { + app.QuitService.OnStart() + + // query connection + querycli, err := app.clientCreator.NewTMSPClient() + if err != nil { + return err + } + app.queryConn = NewAppConnQuery(querycli) + + // mempool connection + memcli, err := app.clientCreator.NewTMSPClient() + if err != nil { + return err + } + app.mempoolConn = NewAppConnMempool(memcli) + + // consensus connection + concli, err := app.clientCreator.NewTMSPClient() + if err != nil { + return err + } + app.consensusConn = NewAppConnConsensus(concli) + + // TODO: handshake + + // TODO: replay blocks + + // TODO: (on restart) replay mempool + + return nil +} diff --git a/proxy/remote_app_conn.go b/proxy/remote_app_conn.go deleted file mode 100644 index 9076bb450..000000000 --- a/proxy/remote_app_conn.go +++ /dev/null @@ -1,23 +0,0 @@ -package proxy - -import ( - tmspcli "github.com/tendermint/tmsp/client" -) - -// This is goroutine-safe, but users should beware that -// the application in general is not meant to be interfaced -// with concurrent callers. -type remoteAppConn struct { - tmspcli.Client -} - -func NewRemoteAppConn(addr, transport string) (*remoteAppConn, error) { - client, err := tmspcli.NewClient(addr, transport, false) - if err != nil { - return nil, err - } - appConn := &remoteAppConn{ - Client: client, - } - return appConn, nil -} diff --git a/proxy/state.go b/proxy/state.go new file mode 100644 index 000000000..a52dd0f8a --- /dev/null +++ b/proxy/state.go @@ -0,0 +1,9 @@ +package proxy + +type State interface { + // TODO +} + +type BlockStore interface { + // TODO +} diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index 92ea4edfc..2a0b43614 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -11,6 +11,8 @@ func Validators() (*ctypes.ResultValidators, error) { var blockHeight int var validators []*types.Validator + // XXX: this is racy. + // Either use state.LoadState(db) or make state atomic (see #165) state := consensusState.GetState() blockHeight = state.LastBlockHeight state.Validators.Iterate(func(index int, val *types.Validator) bool { diff --git a/rpc/core/events.go b/rpc/core/events.go index 0a3edd39b..ab6fd35ec 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -20,6 +20,6 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) { log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - wsCtx.GetEventSwitch().RemoveListener(event) + wsCtx.GetEventSwitch().RemoveListenerForEvent(event, wsCtx.GetRemoteAddr()) return &ctypes.ResultUnsubscribe{}, nil } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 464f7fda9..90febf0b0 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -8,6 +8,7 @@ import ( bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" mempl "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) @@ -19,6 +20,7 @@ var mempoolReactor *mempl.MempoolReactor var p2pSwitch *p2p.Switch var privValidator *types.PrivValidator var genDoc *types.GenesisDoc // cache the genesis structure +var proxyAppQuery proxy.AppConnQuery var config cfg.Config = nil @@ -57,3 +59,7 @@ func SetPrivValidator(pv *types.PrivValidator) { func SetGenesisDoc(doc *types.GenesisDoc) { genDoc = doc } + +func SetProxyAppQuery(appConn proxy.AppConnQuery) { + proxyAppQuery = appConn +} diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 78a0b6187..97c013ab7 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -25,6 +25,9 @@ var Routes = map[string]*rpc.RPCFunc{ "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""), "num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""), + "tmsp_query": rpc.NewRPCFunc(TMSPQueryResult, "query"), + "tmsp_info": rpc.NewRPCFunc(TMSPInfoResult, ""), + "unsafe_flush_mempool": rpc.NewRPCFunc(UnsafeFlushMempool, ""), "unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfigResult, "type,key,value"), "unsafe_start_cpu_profiler": rpc.NewRPCFunc(UnsafeStartCPUProfilerResult, "filename"), @@ -152,6 +155,22 @@ func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) { } } +func TMSPQueryResult(query []byte) (ctypes.TMResult, error) { + if r, err := TMSPQuery(query); err != nil { + return nil, err + } else { + return r, nil + } +} + +func TMSPInfoResult() (ctypes.TMResult, error) { + if r, err := TMSPInfo(); err != nil { + return nil, err + } else { + return r, nil + } +} + func UnsafeFlushMempoolResult() (ctypes.TMResult, error) { if r, err := UnsafeFlushMempool(); err != nil { return nil, err diff --git a/rpc/core/tmsp.go b/rpc/core/tmsp.go new file mode 100644 index 000000000..9a19e6eeb --- /dev/null +++ b/rpc/core/tmsp.go @@ -0,0 +1,17 @@ +package core + +import ( + ctypes "github.com/tendermint/tendermint/rpc/core/types" +) + +//----------------------------------------------------------------------------- + +func TMSPQuery(query []byte) (*ctypes.ResultTMSPQuery, error) { + res := proxyAppQuery.QuerySync(query) + return &ctypes.ResultTMSPQuery{res}, nil +} + +func TMSPInfo() (*ctypes.ResultTMSPInfo, error) { + res := proxyAppQuery.InfoSync() + return &ctypes.ResultTMSPInfo{res}, nil +} diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index c1eebb6e1..cd68addd5 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -68,6 +68,14 @@ type ResultUnconfirmedTxs struct { Txs []types.Tx `json:"txs"` } +type ResultTMSPInfo struct { + Result tmsp.Result `json:"result"` +} + +type ResultTMSPQuery struct { + Result tmsp.Result `json:"result"` +} + type ResultUnsafeFlushMempool struct{} type ResultUnsafeSetConfig struct{} @@ -107,6 +115,10 @@ const ( ResultTypeBroadcastTx = byte(0x60) ResultTypeUnconfirmedTxs = byte(0x61) + // 0x7 bytes are for querying the application + ResultTypeTMSPQuery = byte(0x70) + ResultTypeTMSPInfo = byte(0x71) + // 0x8 bytes are for events ResultTypeSubscribe = byte(0x80) ResultTypeUnsubscribe = byte(0x81) @@ -145,4 +157,6 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeStopCPUProfiler}, wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeWriteHeapProfile}, wire.ConcreteType{&ResultUnsafeFlushMempool{}, ResultTypeUnsafeFlushMempool}, + wire.ConcreteType{&ResultTMSPQuery{}, ResultTypeTMSPQuery}, + wire.ConcreteType{&ResultTMSPInfo{}, ResultTypeTMSPInfo}, ) diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index 47d2ead10..345049d5a 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -2,9 +2,12 @@ package rpctest import ( "bytes" - "crypto/rand" + crand "crypto/rand" "fmt" + "math/rand" + "strings" "testing" + "time" . "github.com/tendermint/go-common" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -14,6 +17,7 @@ import ( //-------------------------------------------------------------------------------- // Test the HTTP client +// These tests assume the dummy app //-------------------------------------------------------------------------------- //-------------------------------------------------------------------------------- @@ -49,20 +53,22 @@ func testStatus(t *testing.T, statusI interface{}) { //-------------------------------------------------------------------------------- // broadcast tx sync -func testTx() []byte { - buf := make([]byte, 16) - _, err := rand.Read(buf) +// random bytes (excluding byte('=')) +func randBytes() []byte { + n := rand.Intn(10) + 2 + buf := make([]byte, n) + _, err := crand.Read(buf) if err != nil { panic(err) } - return buf + return bytes.Replace(buf, []byte("="), []byte{100}, -1) } func TestURIBroadcastTxSync(t *testing.T) { config.Set("block_size", 0) defer config.Set("block_size", -1) tmResult := new(ctypes.TMResult) - tx := testTx() + tx := randBytes() _, err := clientURI.Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, tmResult) if err != nil { panic(err) @@ -74,7 +80,7 @@ func TestJSONBroadcastTxSync(t *testing.T) { config.Set("block_size", 0) defer config.Set("block_size", -1) tmResult := new(ctypes.TMResult) - tx := testTx() + tx := randBytes() _, err := clientJSON.Call("broadcast_tx_sync", []interface{}{tx}, tmResult) if err != nil { panic(err) @@ -95,18 +101,73 @@ func testBroadcastTxSync(t *testing.T, resI interface{}, tx []byte) { txs := mem.Reap(1) if !bytes.Equal(txs[0], tx) { - panic(Fmt("Tx in mempool does not match test tx. Got %X, expected %X", txs[0], testTx)) + panic(Fmt("Tx in mempool does not match test tx. Got %X, expected %X", txs[0], tx)) } mem.Flush() } +//-------------------------------------------------------------------------------- +// query + +func testTxKV() ([]byte, []byte, []byte) { + k := randBytes() + v := randBytes() + return k, v, []byte(Fmt("%s=%s", k, v)) +} + +func sendTx() ([]byte, []byte) { + tmResult := new(ctypes.TMResult) + k, v, tx := testTxKV() + _, err := clientJSON.Call("broadcast_tx_commit", []interface{}{tx}, tmResult) + if err != nil { + panic(err) + } + fmt.Println("SENT TX", tx) + fmt.Printf("SENT TX %X\n", tx) + fmt.Printf("k %X; v %X", k, v) + return k, v +} + +func TestURITMSPQuery(t *testing.T) { + k, v := sendTx() + time.Sleep(time.Second) + tmResult := new(ctypes.TMResult) + _, err := clientURI.Call("tmsp_query", map[string]interface{}{"query": Fmt("%X", k)}, tmResult) + if err != nil { + panic(err) + } + testTMSPQuery(t, tmResult, v) +} + +func TestJSONTMSPQuery(t *testing.T) { + k, v := sendTx() + tmResult := new(ctypes.TMResult) + _, err := clientJSON.Call("tmsp_query", []interface{}{Fmt("%X", k)}, tmResult) + if err != nil { + panic(err) + } + testTMSPQuery(t, tmResult, v) +} + +func testTMSPQuery(t *testing.T, statusI interface{}, value []byte) { + tmRes := statusI.(*ctypes.TMResult) + query := (*tmRes).(*ctypes.ResultTMSPQuery) + if query.Result.IsErr() { + panic(Fmt("Query returned an err: %v", query)) + } + // XXX: specific to value returned by the dummy + if !strings.Contains(string(query.Result.Data), "exists=true") { + panic(Fmt("Query error. Expected to find 'exists=true'. Got: %s", query.Result.Data)) + } +} + //-------------------------------------------------------------------------------- // broadcast tx commit func TestURIBroadcastTxCommit(t *testing.T) { tmResult := new(ctypes.TMResult) - tx := testTx() + tx := randBytes() _, err := clientURI.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult) if err != nil { panic(err) @@ -116,7 +177,7 @@ func TestURIBroadcastTxCommit(t *testing.T) { func TestJSONBroadcastTxCommit(t *testing.T) { tmResult := new(ctypes.TMResult) - tx := testTx() + tx := randBytes() _, err := clientJSON.Call("broadcast_tx_commit", []interface{}{tx}, tmResult) if err != nil { panic(err) diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 59709ada2..17acaf9be 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -13,7 +13,6 @@ import ( "github.com/tendermint/tendermint/config/tendermint_test" nm "github.com/tendermint/tendermint/node" ctypes "github.com/tendermint/tendermint/rpc/core/types" - "github.com/tendermint/tendermint/types" ) // global variables for use across all tests @@ -52,10 +51,9 @@ func init() { // create a new node and sleep forever func newNode(ready chan struct{}) { // Create & start node - privValidatorFile := config.GetString("priv_validator_file") - privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - node = nm.NewNode(config, privValidator, nm.GetProxyApp) - l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), true) + node = nm.NewNodeDefault(config) + protocol, address := nm.ProtocolAndAddress(config.GetString("node_laddr")) + l := p2p.NewDefaultListener(protocol, address, true) node.AddListener(l) node.Start() diff --git a/scripts/glide/parse.sh b/scripts/glide/parse.sh index a2f68d540..a92f70bc1 100644 --- a/scripts/glide/parse.sh +++ b/scripts/glide/parse.sh @@ -1,7 +1,8 @@ #! /bin/bash set -euo pipefail -GLIDE=$1 -LIB=$2 +LIB=$1 + +GLIDE=$GOPATH/src/github.com/tendermint/tendermint/glide.lock cat $GLIDE | grep -A1 $LIB | grep -v $LIB | awk '{print $2}' diff --git a/scripts/glide/status.sh b/scripts/glide/status.sh index 11e573ae7..374fe38f2 100644 --- a/scripts/glide/status.sh +++ b/scripts/glide/status.sh @@ -2,12 +2,13 @@ # for every github.com/tendermint dependency, warn is if its not synced with origin/master -GLIDE=$1 +GLIDE=$GOPATH/src/github.com/tendermint/tendermint/glide.lock # make list of libs LIBS=($(grep "github.com/tendermint" $GLIDE | awk '{print $3}')) +UPTODATE=true for lib in "${LIBS[@]}"; do # get vendored commit VENDORED=`grep -A1 $lib $GLIDE | grep -v $lib | awk '{print $2}'` @@ -18,6 +19,7 @@ for lib in "${LIBS[@]}"; do cd $PWD if [[ "$VENDORED" != "$MASTER" ]]; then + UPTODATE=false echo "" if [[ "$VENDORED" != "$HEAD" ]]; then echo "Vendored version of $lib differs from origin/master and HEAD" @@ -32,3 +34,7 @@ for lib in "${LIBS[@]}"; do fi done +if [[ "$UPTODATE" == "true" ]]; then + echo "All vendored versions up to date" +fi + diff --git a/scripts/glide/update.sh b/scripts/glide/update.sh index 630859d84..64dd8cda0 100644 --- a/scripts/glide/update.sh +++ b/scripts/glide/update.sh @@ -4,10 +4,11 @@ IFS=$'\n\t' # script to update the given dependency in the glide.lock file with the checked out branch on the local host -GLIDE=$1 -LIB=$2 +LIB=$1 -OLD_COMMIT=`bash scripts/glide/parse.sh $GLIDE $LIB` +GLIDE=$GOPATH/src/github.com/tendermint/tendermint/glide.lock + +OLD_COMMIT=`bash scripts/glide/parse.sh $LIB` PWD=`pwd` cd $GOPATH/src/github.com/tendermint/$LIB diff --git a/scripts/install_tmsp_apps.sh b/scripts/install_tmsp_apps.sh index 035bc8d3e..6abe4da09 100644 --- a/scripts/install_tmsp_apps.sh +++ b/scripts/install_tmsp_apps.sh @@ -2,6 +2,7 @@ go get github.com/tendermint/tmsp/... +# get the tmsp commit used by tendermint COMMIT=`bash scripts/glide/parse.sh $(pwd)/glide.lock tmsp` cd $GOPATH/src/github.com/tendermint/tmsp diff --git a/state/execution.go b/state/execution.go index 13e776c8c..14ea965c1 100644 --- a/state/execution.go +++ b/state/execution.go @@ -18,7 +18,7 @@ func (s *State) ValidateBlock(block *types.Block) error { // Execute the block to mutate State. // Validates block and then executes Data.Txs in the block. -func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConn, block *types.Block, blockPartsHeader types.PartSetHeader) error { +func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error { // Validate the block. err := s.validateBlock(block) @@ -55,7 +55,7 @@ func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConn // Executes block's transactions on proxyAppConn. // TODO: Generate a bitmap or otherwise store tx validity in state. -func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn proxy.AppConn, block *types.Block) error { +func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) error { var validTxs, invalidTxs = 0, 0 diff --git a/test/Dockerfile b/test/Dockerfile deleted file mode 100644 index 26995de38..000000000 --- a/test/Dockerfile +++ /dev/null @@ -1,23 +0,0 @@ -# Pull base image. -FROM golang:1.6 - -# Grab deps (jq, hexdump) -RUN apt-get update && \ - apt-get install -y --no-install-recommends \ - jq bsdmainutils && \ - rm -rf /var/lib/apt/lists/* - -ENV TENDERMINT_ORG $GOPATH/src/github.com/tendermint/ -RUN mkdir -p $TENDERMINT_ORG -COPY . $TENDERMINT_ORG/tendermint -WORKDIR $TENDERMINT_ORG/tendermint - -RUN make get_vendor_deps - -RUN go install ./cmd/tendermint - -RUN bash scripts/install_tmsp_apps.sh - - -EXPOSE 46656 -EXPOSE 46657 diff --git a/test/app/dummy_test.sh b/test/app/dummy_test.sh index d294e56f8..bb17c6c31 100644 --- a/test/app/dummy_test.sh +++ b/test/app/dummy_test.sh @@ -13,10 +13,15 @@ TESTNAME=$1 # store key value pair KEY="abcd" VALUE="dcba" -curl 127.0.0.1:46657/broadcast_tx_commit?tx=\"$(toHex $KEY=$VALUE)\" +curl -s 127.0.0.1:46657/broadcast_tx_commit?tx=\"$(toHex $KEY=$VALUE)\" echo $? echo "" + +########################### +# test using the tmsp-cli +########################### + # we should be able to look up the key RESPONSE=`tmsp-cli query $KEY` @@ -40,4 +45,34 @@ if [[ $? == 0 ]]; then fi set -e +############################# +# test using the /tmsp_query +############################# + +# we should be able to look up the key +RESPONSE=`curl -s 127.0.0.1:46657/tmsp_query?query=\"$(toHex $KEY)\"` +RESPONSE=`echo $RESPONSE | jq .result[1].result.Data | xxd -r -p` + +set +e +A=`echo $RESPONSE | grep exists=true` +if [[ $? != 0 ]]; then + echo "Failed to find 'exists=true' for $KEY. Response:" + echo "$RESPONSE" + exit 1 +fi +set -e + +# we should not be able to look up the value +RESPONSE=`curl -s 127.0.0.1:46657/tmsp_query?query=\"$(toHex $VALUE)\"` +RESPONSE=`echo $RESPONSE | jq .result[1].result.Data | xxd -r -p` +set +e +A=`echo $RESPONSE | grep exists=true` +if [[ $? == 0 ]]; then + echo "Found 'exists=true' for $VALUE when we should not have. Response:" + echo "$RESPONSE" + exit 1 +fi +set -e + + echo "Passed Test: $TESTNAME" diff --git a/test/docker/Dockerfile b/test/docker/Dockerfile new file mode 100644 index 000000000..83435adcf --- /dev/null +++ b/test/docker/Dockerfile @@ -0,0 +1,22 @@ +# Pull base image. +FROM golang:1.6 + +# Grab deps (jq, hexdump, xxd, killall) +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + jq bsdmainutils vim-common psmisc + +ENV REPO $GOPATH/src/github.com/tendermint/tendermint +WORKDIR $REPO +ADD glide.yaml glide.yaml +ADD glide.lock glide.lock +ADD Makefile Makefile +RUN make get_vendor_deps + +COPY . $REPO + +RUN go install ./cmd/tendermint +RUN bash scripts/install_tmsp_apps.sh + +EXPOSE 46656 +EXPOSE 46657 diff --git a/test/docker/build.sh b/test/docker/build.sh new file mode 100644 index 000000000..39df08720 --- /dev/null +++ b/test/docker/build.sh @@ -0,0 +1,3 @@ +#! /bin/bash + +docker build -t tester -f ./test/docker/Dockerfile . diff --git a/test/docker/update.sh b/test/docker/update.sh new file mode 100644 index 000000000..b946bf395 --- /dev/null +++ b/test/docker/update.sh @@ -0,0 +1,9 @@ +#! /bin/bash + +# update the `tester` image by copying in the latest tendermint binary + +docker run --name builder tester true +docker cp $GOPATH/bin/tendermint builder:/go/bin/tendermint +docker commit builder tester +docker rm -vf builder + diff --git a/test/net/test.sh b/test/net/test.sh index 9ec03e9a2..e3029a0dd 100644 --- a/test/net/test.sh +++ b/test/net/test.sh @@ -1,6 +1,8 @@ #! /bin/bash set -eu +# start a testnet and benchmark throughput using mintnet+netmon via the network_testing repo + DATACENTER=single VALSETSIZE=4 BLOCKSIZE=8092 diff --git a/test/p2p/run_test.sh b/test/p2p/atomic_broadcast/test.sh similarity index 76% rename from test/p2p/run_test.sh rename to test/p2p/atomic_broadcast/test.sh index 87ab8e1f7..3b78166d6 100644 --- a/test/p2p/run_test.sh +++ b/test/p2p/atomic_broadcast/test.sh @@ -1,9 +1,20 @@ #! /bin/bash +################################################################### +# wait for all peers to come online +# for each peer: +# wait to have 3 peers +# wait to be at height > 1 +# send a tx, wait for commit +# assert app hash on every peer reflects the post tx state +################################################################### + +N=4 + # wait for everyone to come online echo "Waiting for nodes to come online" -for i in `seq 1 4`; do - addr="172.57.0.$((100+$i)):46657" +for i in `seq 1 $N`; do + addr=$(test/p2p/ip.sh $i):46657 curl -s $addr/status > /dev/null ERR=$? while [ "$ERR" != 0 ]; do @@ -16,8 +27,8 @@ done echo "" # run the test on each of them -for i in `seq 1 4`; do - addr="172.57.0.$((100+$i)):46657" +for i in `seq 1 $N`; do + addr=$(test/p2p/ip.sh $i):46657 # - assert everyone has 3 other peers N_PEERS=`curl -s $addr/net_info | jq '.result[1].peers | length'` @@ -61,9 +72,10 @@ for i in `seq 1 4`; do fi # check we get the same new hash on all other nodes - for j in `seq 1 4`; do + for j in `seq 1 $N`; do if [[ "$i" != "$j" ]]; then - HASH3=`curl -s 172.57.0.$((100+$j)):46657/status | jq .result[1].latest_app_hash` + addrJ=$(test/p2p/ip.sh $j):46657 + HASH3=`curl -s $addrJ/status | jq .result[1].latest_app_hash` if [[ "$HASH2" != "$HASH3" ]]; then echo "App hash for node $j doesn't match. Got $HASH3, expected $HASH2" @@ -77,3 +89,4 @@ done echo "" echo "PASS" +echo "" diff --git a/test/p2p/clean.sh b/test/p2p/clean.sh index e0fb36956..44a1276ef 100644 --- a/test/p2p/clean.sh +++ b/test/p2p/clean.sh @@ -1,4 +1,5 @@ #! /bin/bash +# clean everything docker rm -vf $(docker ps -aq) docker network rm local_testnet diff --git a/test/p2p/test_client.sh b/test/p2p/client.sh similarity index 69% rename from test/p2p/test_client.sh rename to test/p2p/client.sh index d9fe64c5c..efc5096ef 100644 --- a/test/p2p/test_client.sh +++ b/test/p2p/client.sh @@ -3,14 +3,16 @@ set -eu DOCKER_IMAGE=$1 NETWORK_NAME=$2 -CMD=$3 +ID=$3 +CMD=$4 +echo "starting test client container with CMD=$CMD" # run the test container on the local network docker run -t \ - --rm \ -v $GOPATH/src/github.com/tendermint/tendermint/test/p2p/:/go/src/github.com/tendermint/tendermint/test/p2p \ --net=$NETWORK_NAME \ - --ip=172.57.0.99 \ - --name test_container \ + --ip=$(test/p2p/ip.sh "-1") \ + --name test_container_$ID \ --entrypoint bash \ $DOCKER_IMAGE $CMD + diff --git a/test/p2p/fast_sync/test.sh b/test/p2p/fast_sync/test.sh new file mode 100644 index 000000000..7a5453f00 --- /dev/null +++ b/test/p2p/fast_sync/test.sh @@ -0,0 +1,44 @@ +#! /bin/bash +set -eu +set -o pipefail + +############################################################### +# for each peer: +# kill peer +# bring it back online via fast sync +# check app hash +############################################################### + +ID=$1 + +addr=$(test/p2p/ip.sh $ID):46657 +peerID=$(( $(($ID % 4)) + 1 )) # 1->2 ... 3->4 ... 4->1 +peer_addr=$(test/p2p/ip.sh $peerID):46657 + +# get another peer's height +h1=`curl -s $peer_addr/status | jq .result[1].latest_block_height` + +# get another peer's state +root1=`curl -s $peer_addr/status | jq .result[1].latest_app_hash` + +echo "Other peer is on height $h1 with state $root1" +echo "Waiting for peer $ID to catch up" + +# wait for it to sync to past its previous height +set +e +set +o pipefail +h2="0" +while [[ "$h2" -lt "$(($h1+3))" ]]; do + sleep 1 + h2=`curl -s $addr/status | jq .result[1].latest_block_height` + echo "... $h2" +done + +# check the app hash +root2=`curl -s $addr/status | jq .result[1].latest_app_hash` + +if [[ "$root1" != "$root2" ]]; then + echo "App hash after fast sync does not match. Got $root2; expected $root1" + exit 1 +fi +echo "... fast sync successful" diff --git a/test/p2p/ip.sh b/test/p2p/ip.sh new file mode 100755 index 000000000..33ea890db --- /dev/null +++ b/test/p2p/ip.sh @@ -0,0 +1,7 @@ +#! /bin/bash +set -eu + +ID=$1 +echo "172.57.0.$((100+$ID))" + + diff --git a/test/p2p/local_testnet.sh b/test/p2p/local_testnet.sh index 2b4183b95..9380adfd4 100644 --- a/test/p2p/local_testnet.sh +++ b/test/p2p/local_testnet.sh @@ -10,19 +10,12 @@ cd $GOPATH/src/github.com/tendermint/tendermint docker network create --driver bridge --subnet 172.57.0.0/16 $NETWORK_NAME N=4 -seeds="172.57.0.101:46656" +seeds="$(test/p2p/ip.sh 1):46656" for i in `seq 2 $N`; do - seeds="$seeds,172.57.0.$((100+$i)):46656" + seeds="$seeds,$(test/p2p/ip.sh $i):46656" done echo "Seeds: $seeds" for i in `seq 1 $N`; do - # start tendermint container - docker run -d \ - --net=$NETWORK_NAME \ - --ip=172.57.0.$((100+$i)) \ - --name local_testnet_$i \ - --entrypoint tendermint \ - -e TMROOT=/go/src/github.com/tendermint/tendermint/test/p2p/data/mach$i/core \ - $DOCKER_IMAGE node --seeds $seeds --proxy_app=dummy + bash test/p2p/peer.sh $DOCKER_IMAGE $NETWORK_NAME $i $seeds done diff --git a/test/p2p/peer.sh b/test/p2p/peer.sh new file mode 100644 index 000000000..d1405eff1 --- /dev/null +++ b/test/p2p/peer.sh @@ -0,0 +1,23 @@ +#! /bin/bash +set -eu + +DOCKER_IMAGE=$1 +NETWORK_NAME=$2 +ID=$3 + +set +u +SEEDS=$4 +set -u +if [[ "$SEEDS" != "" ]]; then + SEEDS=" --seeds $SEEDS " +fi + +echo "starting tendermint peer ID=$ID" +# start tendermint container on the network +docker run -d \ + --net=$NETWORK_NAME \ + --ip=$(test/p2p/ip.sh $ID) \ + --name local_testnet_$ID \ + --entrypoint tendermint \ + -e TMROOT=/go/src/github.com/tendermint/tendermint/test/p2p/data/mach$ID/core \ + $DOCKER_IMAGE node $SEEDS --proxy_app=dummy diff --git a/test/p2p/test.sh b/test/p2p/test.sh index f139fd675..4d021a4ee 100644 --- a/test/p2p/test.sh +++ b/test/p2p/test.sh @@ -1,10 +1,38 @@ #! /bin/bash +set -eu DOCKER_IMAGE=$1 NETWORK_NAME=local_testnet +cd $GOPATH/src/github.com/tendermint/tendermint + # start the testnet on a local network bash test/p2p/local_testnet.sh $DOCKER_IMAGE $NETWORK_NAME -# run the test -bash test/p2p/test_client.sh $DOCKER_IMAGE $NETWORK_NAME test/p2p/run_test.sh +# test atomic broadcast +bash test/p2p/client.sh $DOCKER_IMAGE $NETWORK_NAME ab test/p2p/atomic_broadcast/test.sh + +# test fast sync (from current state of network) +# run it on each of them +N=4 +for i in `seq 1 $N`; do + echo "Testing fasysync on node $i" + + # kill peer + set +e # circle sigh :( + docker rm -vf local_testnet_$i + set -e + + # restart peer - should have an empty blockchain + SEEDS="$(test/p2p/ip.sh 1):46656" + for j in `seq 2 $N`; do + SEEDS="$SEEDS,$(test/p2p/ip.sh $j):46656" + done + bash test/p2p/peer.sh $DOCKER_IMAGE $NETWORK_NAME $i $SEEDS + + bash test/p2p/client.sh $DOCKER_IMAGE $NETWORK_NAME fs_$i "test/p2p/fast_sync/test.sh $i" +done +echo "" +echo "PASS" +echo "" + diff --git a/test/test.sh b/test/test.sh index 7d69728d5..6e2925902 100644 --- a/test/test.sh +++ b/test/test.sh @@ -1,20 +1,21 @@ #! /bin/bash +set -eu # Top Level Testing Script # See the github.com/tendermint/tendermint/test/README.md echo "" -echo "* building docker file" -docker build -t tester -f ./test/Dockerfile . +echo "* building docker image" +bash ./test/docker/build.sh echo "" -echo "* running go tests and app tests" +echo "* running go tests and app tests in docker container" docker run -t tester bash test/run_test.sh # test basic network connectivity # by starting a local testnet and checking peers connect and make blocks echo "" -echo "* running basic peer tests" +echo "* running p2p tests on a local docker network" bash test/p2p/test.sh tester # only run the cloud benchmark for releases diff --git a/types/priv_validator.go b/types/priv_validator.go index 30fdfff76..5700fdf59 100644 --- a/types/priv_validator.go +++ b/types/priv_validator.go @@ -35,11 +35,13 @@ func voteToStep(vote *Vote) int8 { } type PrivValidator struct { - Address []byte `json:"address"` - PubKey crypto.PubKey `json:"pub_key"` - LastHeight int `json:"last_height"` - LastRound int `json:"last_round"` - LastStep int8 `json:"last_step"` + Address []byte `json:"address"` + PubKey crypto.PubKey `json:"pub_key"` + LastHeight int `json:"last_height"` + LastRound int `json:"last_round"` + LastStep int8 `json:"last_step"` + LastSignature crypto.Signature `json:"last_signature"` // so we dont lose signatures + LastSignBytes []byte `json:"last_signbytes"` // so we dont lose signatures // PrivKey should be empty if a Signer other than the default is being used. PrivKey crypto.PrivKey `json:"priv_key"` @@ -85,14 +87,16 @@ func GenPrivValidator() *PrivValidator { pubKey := crypto.PubKeyEd25519(*pubKeyBytes) privKey := crypto.PrivKeyEd25519(*privKeyBytes) return &PrivValidator{ - Address: pubKey.Address(), - PubKey: pubKey, - PrivKey: privKey, - LastHeight: 0, - LastRound: 0, - LastStep: stepNone, - filePath: "", - Signer: NewDefaultSigner(privKey), + Address: pubKey.Address(), + PubKey: pubKey, + PrivKey: privKey, + LastHeight: 0, + LastRound: 0, + LastStep: stepNone, + LastSignature: nil, + LastSignBytes: nil, + filePath: "", + Signer: NewDefaultSigner(privKey), } } @@ -149,56 +153,84 @@ func (privVal *PrivValidator) save() { } } +// NOTE: Unsafe! +func (privVal *PrivValidator) Reset() { + privVal.LastHeight = 0 + privVal.LastRound = 0 + privVal.LastStep = 0 + privVal.LastSignature = nil + privVal.LastSignBytes = nil + privVal.Save() +} + func (privVal *PrivValidator) SignVote(chainID string, vote *Vote) error { privVal.mtx.Lock() defer privVal.mtx.Unlock() + signature, err := privVal.signBytesHRS(vote.Height, vote.Round, voteToStep(vote), SignBytes(chainID, vote)) + if err != nil { + return errors.New(Fmt("Error signing vote: %v", err)) + } + vote.Signature = signature.(crypto.SignatureEd25519) + return nil +} - // If height regression, panic - if privVal.LastHeight > vote.Height { - return errors.New("Height regression in SignVote") +func (privVal *PrivValidator) SignProposal(chainID string, proposal *Proposal) error { + privVal.mtx.Lock() + defer privVal.mtx.Unlock() + signature, err := privVal.signBytesHRS(proposal.Height, proposal.Round, stepPropose, SignBytes(chainID, proposal)) + if err != nil { + return errors.New(Fmt("Error signing proposal: %v", err)) + } + proposal.Signature = signature.(crypto.SignatureEd25519) + return nil +} + +// check if there's a regression. Else sign and write the hrs+signature to disk +func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signBytes []byte) (crypto.Signature, error) { + // If height regression, err + if privVal.LastHeight > height { + return nil, errors.New("Height regression") } // More cases for when the height matches - if privVal.LastHeight == vote.Height { - // If round regression, panic - if privVal.LastRound > vote.Round { - return errors.New("Round regression in SignVote") + if privVal.LastHeight == height { + // If round regression, err + if privVal.LastRound > round { + return nil, errors.New("Round regression") } - // If step regression, panic - if privVal.LastRound == vote.Round && privVal.LastStep > voteToStep(vote) { - return errors.New("Step regression in SignVote") + // If step regression, err + if privVal.LastRound == round { + if privVal.LastStep > step { + return nil, errors.New("Step regression") + } else if privVal.LastStep == step { + if privVal.LastSignBytes != nil { + if privVal.LastSignature == nil { + PanicSanity("privVal: LastSignature is nil but LastSignBytes is not!") + } + // so we dont sign a conflicting vote or proposal + // NOTE: proposals are non-deterministic (include time), + // so we can actually lose them, but will still never sign conflicting ones + if bytes.Equal(privVal.LastSignBytes, signBytes) { + log.Notice("Using privVal.LastSignature", "sig", privVal.LastSignature) + return privVal.LastSignature, nil + } + } + return nil, errors.New("Step regression") + } } } + // Sign + signature := privVal.Sign(signBytes) + // Persist height/round/step - privVal.LastHeight = vote.Height - privVal.LastRound = vote.Round - privVal.LastStep = voteToStep(vote) + privVal.LastHeight = height + privVal.LastRound = round + privVal.LastStep = step + privVal.LastSignature = signature + privVal.LastSignBytes = signBytes privVal.save() - // Sign - vote.Signature = privVal.Sign(SignBytes(chainID, vote)).(crypto.SignatureEd25519) - return nil -} - -func (privVal *PrivValidator) SignProposal(chainID string, proposal *Proposal) error { - privVal.mtx.Lock() - defer privVal.mtx.Unlock() - if privVal.LastHeight < proposal.Height || - privVal.LastHeight == proposal.Height && privVal.LastRound < proposal.Round || - privVal.LastHeight == 0 && privVal.LastRound == 0 && privVal.LastStep == stepNone { - - // Persist height/round/step - privVal.LastHeight = proposal.Height - privVal.LastRound = proposal.Round - privVal.LastStep = stepPropose - privVal.save() - - // Sign - proposal.Signature = privVal.Sign(SignBytes(chainID, proposal)).(crypto.SignatureEd25519) - return nil - } else { - return errors.New(fmt.Sprintf("Attempt of duplicate signing of proposal: Height %v, Round %v", proposal.Height, proposal.Round)) - } + return signature, nil } func (privVal *PrivValidator) String() string { diff --git a/version/version.go b/version/version.go index e275ce5b0..5dc83a23d 100644 --- a/version/version.go +++ b/version/version.go @@ -2,6 +2,6 @@ package version const Maj = "0" const Min = "7" // tmsp useability (protobuf, unix); optimizations; broadcast_tx_commit -const Fix = "0" +const Fix = "1" // query conn, peer filter, fast sync fix const Version = Maj + "." + Min + "." + Fix