diff --git a/alert/alert.go b/alert/alert.go deleted file mode 100644 index c47633091..000000000 --- a/alert/alert.go +++ /dev/null @@ -1,64 +0,0 @@ -package alert - -import ( - "fmt" - "time" - - "github.com/sfreiberg/gotwilio" -) - -var lastAlertUnix int64 = 0 -var alertCountSince int = 0 - -// Sends a critical alert message to administrators. -func Alert(message string) { - log.Error(" ALERT \n" + message) - now := time.Now().Unix() - if now-lastAlertUnix > int64(config.GetInt("alert_min_interval")) { - message = fmt.Sprintf("%v:%v", config.GetString("chain_id"), message) - if alertCountSince > 0 { - message = fmt.Sprintf("%v (+%v more since)", message, alertCountSince) - alertCountSince = 0 - } - if len(config.GetString("alert_twilio_sid")) > 0 { - go sendTwilio(message) - } - if len(config.GetString("alert_email_recipients")) > 0 { - go sendEmail(message) - } - } else { - alertCountSince++ - } -} - -func sendTwilio(message string) { - defer func() { - if err := recover(); err != nil { - log.Error("sendTwilio error", "error", err) - } - }() - if len(message) > 50 { - message = message[:50] - } - twilio := gotwilio.NewTwilioClient(config.GetString("alert_twilio_sid"), config.GetString("alert_twilio_token")) - res, exp, err := twilio.SendSMS(config.GetString("alert_twilio_from"), config.GetString("alert_twilio_to"), message, "", "") - if exp != nil || err != nil { - log.Error("sendTwilio error", "res", res, "exp", exp, "error", err) - } -} - -func sendEmail(message string) { - defer func() { - if err := recover(); err != nil { - log.Error("sendEmail error", "error", err) - } - }() - subject := message - if len(subject) > 80 { - subject = subject[:80] - } - err := SendEmail(subject, message, config.GetStringSlice("alert_email_recipients")) - if err != nil { - log.Error("sendEmail error", "error", err, "message", message) - } -} diff --git a/alert/config.go b/alert/config.go deleted file mode 100644 index 4a7bf7714..000000000 --- a/alert/config.go +++ /dev/null @@ -1,14 +0,0 @@ - -package alert - -import ( - cfg "github.com/tendermint/go-config" -) - -var config cfg.Config = nil - -func init() { - cfg.OnConfig(func(newConfig cfg.Config) { - config = newConfig - }) -} diff --git a/alert/email.go b/alert/email.go deleted file mode 100644 index c183f1d58..000000000 --- a/alert/email.go +++ /dev/null @@ -1,176 +0,0 @@ -// Forked from github.com/SlyMarbo/gmail -package alert - -import ( - "bytes" - "crypto/tls" - "encoding/base64" - "errors" - "fmt" - "io/ioutil" - "net/smtp" - "path/filepath" - "regexp" - "strings" -) - -// Convenience function -func SendEmail(subject, body string, tos []string) error { - email := Compose(subject, body) - email.From = config.GetString("smtp_user") - email.ContentType = "text/html; charset=utf-8" - email.AddRecipients(tos...) - err := email.Send() - return err -} - -// Email represents a single message, which may contain -// attachments. -type Email struct { - From string - To []string - Subject string - ContentType string - Body string - Attachments map[string][]byte -} - -// Compose begins a new email, filling the subject and body, -// and allocating memory for the list of recipients and the -// attachments. -func Compose(Subject, Body string) *Email { - out := new(Email) - out.To = make([]string, 0, 1) - out.Subject = Subject - out.Body = Body - out.Attachments = make(map[string][]byte) - return out -} - -// Attach takes a filename and adds this to the message. -// Note that since only the filename is stored (and not -// its path, for privacy reasons), multiple files in -// different directories but with the same filename and -// extension cannot be sent. -func (e *Email) Attach(Filename string) error { - b, err := ioutil.ReadFile(Filename) - if err != nil { - return err - } - - _, fname := filepath.Split(Filename) - e.Attachments[fname] = b - return nil -} - -// AddRecipient adds a single recipient. -func (e *Email) AddRecipient(Recipient string) { - e.To = append(e.To, Recipient) -} - -// AddRecipients adds one or more recipients. -func (e *Email) AddRecipients(Recipients ...string) { - e.To = append(e.To, Recipients...) -} - -// Send sends the email, returning any error encountered. -func (e *Email) Send() error { - if e.From == "" { - return errors.New("Error: No sender specified. Please set the Email.From field.") - } - if e.To == nil || len(e.To) == 0 { - return errors.New("Error: No recipient specified. Please set the Email.To field.") - } - - auth := smtp.PlainAuth( - "", - config.GetString("smtp_user"), - config.GetString("smtp_password"), - config.GetString("smtp_host"), - ) - - conn, err := smtp.Dial(fmt.Sprintf("%v:%v", config.GetString("smtp_host"), config.GetString("smtp_port"))) - if err != nil { - return err - } - - err = conn.StartTLS(&tls.Config{}) - if err != nil { - return err - } - - err = conn.Auth(auth) - if err != nil { - return err - } - - err = conn.Mail(e.From) - if err != nil { - if strings.Contains(err.Error(), "530 5.5.1") { - return errors.New("Error: Authentication failure. Your username or password is incorrect.") - } - return err - } - - for _, recipient := range e.To { - err = conn.Rcpt(recipient) - if err != nil { - return err - } - } - - wc, err := conn.Data() - if err != nil { - return err - } - defer wc.Close() - _, err = wc.Write(e.Bytes()) - if err != nil { - return err - } - - return nil -} - -func (e *Email) Bytes() []byte { - buf := bytes.NewBuffer(nil) - - var subject = e.Subject - subject = regexp.MustCompile("\n+").ReplaceAllString(subject, " ") - subject = regexp.MustCompile(" +").ReplaceAllString(subject, " ") - buf.WriteString("Subject: " + subject + "\n") - buf.WriteString("To: <" + strings.Join(e.To, ">,<") + ">\n") - buf.WriteString("MIME-Version: 1.0\n") - - // Boundary is used by MIME to separate files. - boundary := "f46d043c813270fc6b04c2d223da" - - if len(e.Attachments) > 0 { - buf.WriteString("Content-Type: multipart/mixed; boundary=" + boundary + "\n") - buf.WriteString("--" + boundary + "\n") - } - - if e.ContentType == "" { - e.ContentType = "text/plain; charset=utf-8" - } - buf.WriteString(fmt.Sprintf("Content-Type: %s\n\n", e.ContentType)) - buf.WriteString(e.Body) - - if len(e.Attachments) > 0 { - for k, v := range e.Attachments { - buf.WriteString("\n\n--" + boundary + "\n") - buf.WriteString("Content-Type: application/octet-stream\n") - buf.WriteString("Content-Transfer-Encoding: base64\n") - buf.WriteString("Content-Disposition: attachment; filename=\"" + k + "\"\n\n") - - b := make([]byte, base64.StdEncoding.EncodedLen(len(v))) - base64.StdEncoding.Encode(b, v) - buf.Write(b) - buf.WriteString("\n--" + boundary) - } - - buf.WriteString("--") - } - - return buf.Bytes() -} diff --git a/alert/log.go b/alert/log.go deleted file mode 100644 index 58055cf23..000000000 --- a/alert/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package alert - -import ( - "github.com/tendermint/go-logger" -) - -var log = logger.New("module", "alert") diff --git a/benchmarks/simu/counter.go b/benchmarks/simu/counter.go index 307f29d06..44e00a0c9 100644 --- a/benchmarks/simu/counter.go +++ b/benchmarks/simu/counter.go @@ -8,10 +8,10 @@ import ( "github.com/gorilla/websocket" . "github.com/tendermint/go-common" + "github.com/tendermint/go-rpc/client" + "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/client" _ "github.com/tendermint/tendermint/rpc/core/types" // Register RPCResponse > Result types - "github.com/tendermint/tendermint/rpc/types" ) func main() { diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 7bcd1cb88..06168903f 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -10,7 +10,7 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/events" + "github.com/tendermint/go-events" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 024a5c970..9820ba24a 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -30,6 +30,7 @@ func initTMRoot(rootDir string) { configFilePath := path.Join(rootDir, "config.toml") genesisFilePath := path.Join(rootDir, "genesis.json") + privFilePath := path.Join(rootDir, "priv_validator.json") // Write default config file if missing. if !FileExists(configFilePath) { @@ -40,6 +41,9 @@ func initTMRoot(rootDir string) { if !FileExists(genesisFilePath) { MustWriteFile(genesisFilePath, []byte(defaultGenesis), 0644) } + if !FileExists(privFilePath) { + MustWriteFile(privFilePath, []byte(defaultPrivValidator), 0644) + } } func GetConfig(rootDir string) cfg.Config { @@ -58,7 +62,7 @@ func GetConfig(rootDir string) cfg.Config { } mapConfig.SetDefault("chain_id", "tendermint_test") mapConfig.SetDefault("genesis_file", rootDir+"/genesis.json") - mapConfig.SetDefault("proxy_app", "tcp://127.0.0.1:36658") + mapConfig.SetDefault("proxy_app", "local") mapConfig.SetDefault("moniker", "anonymous") mapConfig.SetDefault("node_laddr", "0.0.0.0:36656") mapConfig.SetDefault("fast_sync", false) @@ -78,7 +82,7 @@ func GetConfig(rootDir string) cfg.Config { var defaultConfigTmpl = `# This is a TOML config file. # For more information, see https://github.com/toml-lang/toml -proxy_app = "tcp://127.0.0.1:36658" +proxy_app = "local" moniker = "__MONIKER__" node_laddr = "0.0.0.0:36656" seeds = "" @@ -93,50 +97,33 @@ func defaultConfig(moniker string) (defaultConfig string) { return } -// priv keys generated deterministically eg rpc/tests/helpers.go var defaultGenesis = `{ - "chain_id" : "tendermint_test", - "accounts": [ - { - "address": "E9B5D87313356465FAE33C406CE2C2979DE60BCB", - "amount": 200000000 - }, - { - "address": "DFE4AFFA4CEE17CD01CB9E061D77C3ECED29BD88", - "amount": 200000000 - }, - { - "address": "F60D30722E7B497FA532FB3207C3FB29C31B1992", - "amount": 200000000 - }, - { - "address": "336CB40A5EB92E496E19B74FDFF2BA017C877FD6", - "amount": 200000000 - }, - { - "address": "D218F0F439BF0384F6F5EF8D0F8B398D941BD1DC", - "amount": 200000000 - } - ], + "genesis_time": "0001-01-01T00:00:00.000Z", + "chain_id": "tendermint_test", "validators": [ { - "pub_key": [1, "583779C3BFA3F6C7E23C7D830A9C3D023A216B55079AD38BFED1207B94A19548"], - "amount": 1000000, - "unbond_to": [ - { - "address": "E9B5D87313356465FAE33C406CE2C2979DE60BCB", - "amount": 100000 - } - ] + "pub_key": [ + 1, + "3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8" + ], + "amount": 10, + "name": "" } - ] + ], + "app_hash": "" }` var defaultPrivValidator = `{ - "address": "1D7A91CB32F758A02EBB9BE1FB6F8DEE56F90D42", - "pub_key": [1,"06FBAC4E285285D1D91FCBC7E91C780ADA11516F67462340B3980CE2B94940E8"], - "priv_key": [1,"C453604BD6480D5538B4C6FD2E3E314B5BCE518D75ADE4DA3DA85AB8ADFD819606FBAC4E285285D1D91FCBC7E91C780ADA11516F67462340B3980CE2B94940E8"], - "last_height":0, - "last_round":0, - "last_step":0 + "address": "D028C9981F7A87F3093672BF0D5B0E2A1B3ED456", + "pub_key": [ + 1, + "3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8" + ], + "priv_key": [ + 1, + "27F82582AEFAE7AB151CFB01C48BB6C1A0DA78F9BDDA979A9F70A84D074EB07D3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8" + ], + "last_height": 0, + "last_round": 0, + "last_step": 0 }` diff --git a/consensus/common_test.go b/consensus/common_test.go index 9f7bc6695..7effed689 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -11,7 +11,7 @@ import ( dbm "github.com/tendermint/go-db" bc "github.com/tendermint/tendermint/blockchain" _ "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/tendermint/events" + "github.com/tendermint/go-events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" diff --git a/consensus/reactor.go b/consensus/reactor.go index f0e34cc48..41ff77ccb 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -9,10 +9,10 @@ import ( "time" . "github.com/tendermint/go-common" + "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" - "github.com/tendermint/tendermint/events" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -234,12 +234,12 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { // broadcasting the result to peers func (conR *ConsensusReactor) registerEventCallbacks() { - conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) { - rs := data.(*types.EventDataRoundState).RoundState().(*RoundState) + conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data events.EventData) { + rs := data.(*types.EventDataRoundState).RoundState.(*RoundState) conR.broadcastNewRoundStep(rs) }) - conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { + conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) { edv := data.(*types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote, edv.Index) }) diff --git a/consensus/state.go b/consensus/state.go index dd7ec5550..721f71574 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -9,9 +9,9 @@ import ( "time" . "github.com/tendermint/go-common" + "github.com/tendermint/go-events" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" - "github.com/tendermint/tendermint/events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" @@ -99,11 +99,11 @@ type RoundState struct { func (rs *RoundState) RoundStateEvent() *types.EventDataRoundState { edrs := &types.EventDataRoundState{ - Height: rs.Height, - Round: rs.Round, - Step: rs.Step.String(), + Height: rs.Height, + Round: rs.Round, + Step: rs.Step.String(), + RoundState: rs, } - edrs.SetRoundState(rs) return edrs } diff --git a/consensus/state_test.go b/consensus/state_test.go index fbe6a3db5..f4d8f5482 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -7,7 +7,7 @@ import ( "time" _ "github.com/tendermint/tendermint/config/tendermint_test" - //"github.com/tendermint/tendermint/events" + //"github.com/tendermint/go-events" "github.com/tendermint/tendermint/types" ) @@ -243,7 +243,7 @@ func TestFullRound1(t *testing.T) { // grab proposal re := <-propCh - propBlockHash := re.(*types.EventDataRoundState).RoundState().(*RoundState).ProposalBlock.Hash() + propBlockHash := re.(*types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash() <-voteCh // wait for prevote validatePrevote(t, cs, round, vss[0], propBlockHash) @@ -336,7 +336,7 @@ func TestLockNoPOL(t *testing.T) { cs1.startRoutines(0) re := <-proposalCh - rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs := re.(*types.EventDataRoundState).RoundState.(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -376,7 +376,7 @@ func TestLockNoPOL(t *testing.T) { // now we're on a new round and not the proposer, so wait for timeout re = <-timeoutProposeCh - rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs = re.(*types.EventDataRoundState).RoundState.(*RoundState) if rs.ProposalBlock != nil { t.Fatal("Expected proposal block to be nil") @@ -420,7 +420,7 @@ func TestLockNoPOL(t *testing.T) { incrementRound(cs2) re = <-proposalCh - rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs = re.(*types.EventDataRoundState).RoundState.(*RoundState) // now we're on a new round and are the proposer if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) { @@ -505,7 +505,7 @@ func TestLockPOLRelock(t *testing.T) { <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs := re.(*types.EventDataRoundState).RoundState.(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -576,7 +576,7 @@ func TestLockPOLRelock(t *testing.T) { be := <-newBlockCh b := be.(types.EventDataNewBlock) re = <-newRoundCh - rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs = re.(*types.EventDataRoundState).RoundState.(*RoundState) if rs.Height != 2 { t.Fatal("Expected height to increment") } @@ -610,7 +610,7 @@ func TestLockPOLUnlock(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs := re.(*types.EventDataRoundState).RoundState.(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -634,7 +634,7 @@ func TestLockPOLUnlock(t *testing.T) { // timeout to new round re = <-timeoutWaitCh - rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs = re.(*types.EventDataRoundState).RoundState.(*RoundState) lockedBlockHash := rs.LockedBlock.Hash() //XXX: this isnt gauranteed to get there before the timeoutPropose ... @@ -692,7 +692,7 @@ func TestLockPOLSafety1(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs := re.(*types.EventDataRoundState).RoundState.(*RoundState) propBlock := rs.ProposalBlock <-voteCh // prevote @@ -740,7 +740,7 @@ func TestLockPOLSafety1(t *testing.T) { re = <-proposalCh } - rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs = re.(*types.EventDataRoundState).RoundState.(*RoundState) if rs.LockedBlock != nil { t.Fatal("we should not be locked!") @@ -903,7 +903,7 @@ func TestSlashingPrevotes(t *testing.T) { re := <-proposalCh <-voteCh // prevote - rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs := re.(*types.EventDataRoundState).RoundState.(*RoundState) // we should now be stuck in limbo forever, waiting for more prevotes // add one for a different block should cause us to go into prevote wait @@ -981,7 +981,7 @@ func TestHalt1(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs := re.(*types.EventDataRoundState).RoundState.(*RoundState) propBlock := rs.ProposalBlock propBlockParts := propBlock.MakePartSet() @@ -1004,7 +1004,7 @@ func TestHalt1(t *testing.T) { // timeout to new round <-timeoutWaitCh re = <-newRoundCh - rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs = re.(*types.EventDataRoundState).RoundState.(*RoundState) log.Notice("### ONTO ROUND 1") /*Round2 @@ -1022,7 +1022,7 @@ func TestHalt1(t *testing.T) { // receiving that precommit should take us straight to commit <-newBlockCh re = <-newRoundCh - rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs = re.(*types.EventDataRoundState).RoundState.(*RoundState) if rs.Height != 2 { t.Fatal("expected height to increment") diff --git a/events/event_cache.go b/events/event_cache.go deleted file mode 100644 index d0109ae11..000000000 --- a/events/event_cache.go +++ /dev/null @@ -1,45 +0,0 @@ -package events - -import ( - "github.com/tendermint/tendermint/types" -) - -const ( - eventsBufferSize = 1000 -) - -// An EventCache buffers events for a Fireable -// All events are cached. Filtering happens on Flush -type EventCache struct { - evsw Fireable - events []eventInfo -} - -// Create a new EventCache with an EventSwitch as backend -func NewEventCache(evsw Fireable) *EventCache { - return &EventCache{ - evsw: evsw, - events: make([]eventInfo, eventsBufferSize), - } -} - -// a cached event -type eventInfo struct { - event string - data types.EventData -} - -// Cache an event to be fired upon finality. -func (evc *EventCache) FireEvent(event string, data types.EventData) { - // append to list - evc.events = append(evc.events, eventInfo{event, data}) -} - -// Fire events by running evsw.FireEvent on all cached events. Blocks. -// Clears cached events -func (evc *EventCache) Flush() { - for _, ei := range evc.events { - evc.evsw.FireEvent(ei.event, ei.data) - } - evc.events = make([]eventInfo, eventsBufferSize) -} diff --git a/events/events.go b/events/events.go deleted file mode 100644 index 647522d0a..000000000 --- a/events/events.go +++ /dev/null @@ -1,215 +0,0 @@ -package events - -import ( - "sync" - - . "github.com/tendermint/go-common" - "github.com/tendermint/tendermint/types" -) - -// reactors and other modules should export -// this interface to become eventable -type Eventable interface { - SetEventSwitch(evsw *EventSwitch) -} - -// an event switch or cache implements fireable -type Fireable interface { - FireEvent(event string, data types.EventData) -} - -type EventSwitch struct { - BaseService - - mtx sync.RWMutex - eventCells map[string]*eventCell - listeners map[string]*eventListener -} - -func NewEventSwitch() *EventSwitch { - evsw := &EventSwitch{} - evsw.BaseService = *NewBaseService(log, "EventSwitch", evsw) - return evsw -} - -func (evsw *EventSwitch) OnStart() error { - evsw.BaseService.OnStart() - evsw.eventCells = make(map[string]*eventCell) - evsw.listeners = make(map[string]*eventListener) - return nil -} - -func (evsw *EventSwitch) OnStop() { - evsw.BaseService.OnStop() - evsw.eventCells = nil - evsw.listeners = nil -} - -func (evsw *EventSwitch) AddListenerForEvent(listenerID, event string, cb eventCallback) { - // Get/Create eventCell and listener - evsw.mtx.Lock() - eventCell := evsw.eventCells[event] - if eventCell == nil { - eventCell = newEventCell() - evsw.eventCells[event] = eventCell - } - listener := evsw.listeners[listenerID] - if listener == nil { - listener = newEventListener(listenerID) - evsw.listeners[listenerID] = listener - } - evsw.mtx.Unlock() - - // Add event and listener - eventCell.AddListener(listenerID, cb) - listener.AddEvent(event) -} - -func (evsw *EventSwitch) RemoveListener(listenerID string) { - // Get and remove listener - evsw.mtx.RLock() - listener := evsw.listeners[listenerID] - delete(evsw.listeners, listenerID) - evsw.mtx.RUnlock() - - if listener == nil { - return - } - - // Remove callback for each event. - listener.SetRemoved() - for _, event := range listener.GetEvents() { - evsw.RemoveListenerForEvent(event, listenerID) - } -} - -func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerID string) { - // Get eventCell - evsw.mtx.Lock() - eventCell := evsw.eventCells[event] - evsw.mtx.Unlock() - - if eventCell == nil { - return - } - - // Remove listenerID from eventCell - numListeners := eventCell.RemoveListener(listenerID) - - // Maybe garbage collect eventCell. - if numListeners == 0 { - // Lock again and double check. - evsw.mtx.Lock() // OUTER LOCK - eventCell.mtx.Lock() // INNER LOCK - if len(eventCell.listeners) == 0 { - delete(evsw.eventCells, event) - } - eventCell.mtx.Unlock() // INNER LOCK - evsw.mtx.Unlock() // OUTER LOCK - } -} - -func (evsw *EventSwitch) FireEvent(event string, data types.EventData) { - // Get the eventCell - evsw.mtx.RLock() - eventCell := evsw.eventCells[event] - evsw.mtx.RUnlock() - - if eventCell == nil { - return - } - - // Fire event for all listeners in eventCell - eventCell.FireEvent(data) -} - -func (evsw *EventSwitch) SubscribeToEvent(receiver, eventID string, chanCap int) chan interface{} { - // listen for new round - ch := make(chan interface{}, chanCap) - evsw.AddListenerForEvent(receiver, eventID, func(data types.EventData) { - // NOTE: in production, evsw callbacks should be nonblocking. - ch <- data - }) - return ch -} - -//----------------------------------------------------------------------------- - -// eventCell handles keeping track of listener callbacks for a given event. -type eventCell struct { - mtx sync.RWMutex - listeners map[string]eventCallback -} - -func newEventCell() *eventCell { - return &eventCell{ - listeners: make(map[string]eventCallback), - } -} - -func (cell *eventCell) AddListener(listenerID string, cb eventCallback) { - cell.mtx.Lock() - cell.listeners[listenerID] = cb - cell.mtx.Unlock() -} - -func (cell *eventCell) RemoveListener(listenerID string) int { - cell.mtx.Lock() - delete(cell.listeners, listenerID) - numListeners := len(cell.listeners) - cell.mtx.Unlock() - return numListeners -} - -func (cell *eventCell) FireEvent(data types.EventData) { - cell.mtx.RLock() - for _, listener := range cell.listeners { - listener(data) - } - cell.mtx.RUnlock() -} - -//----------------------------------------------------------------------------- - -type eventCallback func(data types.EventData) - -type eventListener struct { - id string - - mtx sync.RWMutex - removed bool - events []string -} - -func newEventListener(id string) *eventListener { - return &eventListener{ - id: id, - removed: false, - events: nil, - } -} - -func (evl *eventListener) AddEvent(event string) { - evl.mtx.Lock() - defer evl.mtx.Unlock() - - if evl.removed { - return - } - evl.events = append(evl.events, event) -} - -func (evl *eventListener) GetEvents() []string { - evl.mtx.RLock() - defer evl.mtx.RUnlock() - - events := make([]string, len(evl.events)) - copy(events, evl.events) - return events -} - -func (evl *eventListener) SetRemoved() { - evl.mtx.Lock() - defer evl.mtx.Unlock() - evl.removed = true -} diff --git a/events/log.go b/events/log.go deleted file mode 100644 index 525462294..000000000 --- a/events/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package events - -import ( - "github.com/tendermint/go-logger" -) - -var log = logger.New("module", "events") diff --git a/mempool/reactor.go b/mempool/reactor.go index a9de97266..51be9beab 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -10,7 +10,7 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/events" + "github.com/tendermint/go-events" "github.com/tendermint/tendermint/types" ) diff --git a/node/node.go b/node/node.go index 077e11d3e..ab06d48c9 100644 --- a/node/node.go +++ b/node/node.go @@ -7,23 +7,25 @@ import ( "net" "net/http" "strings" + "sync" "time" . "github.com/tendermint/go-common" "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" + "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" + "github.com/tendermint/go-rpc" + "github.com/tendermint/go-rpc/server" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" - "github.com/tendermint/tendermint/events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/rpc" "github.com/tendermint/tendermint/rpc/core" - "github.com/tendermint/tendermint/rpc/server" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmsp/example/golang" ) import _ "net/http/pprof" @@ -320,14 +322,22 @@ func getState() *sm.State { // Get a connection to the proxyAppConn addr. // Check the current hash, and panic if it doesn't match. -func getProxyApp(addr string, hash []byte) proxy.AppConn { - proxyConn, err := Connect(addr) - if err != nil { - Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) - } - proxyAppConn := proxy.NewRemoteAppConn(proxyConn, 1024) +func getProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) { + // use local app (for testing) + if addr == "local" { + app := example.NewCounterApplication(true) + mtx := new(sync.Mutex) + proxyAppConn = proxy.NewLocalAppConn(mtx, app) + } else { + proxyConn, err := Connect(addr) + if err != nil { + Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) + } + remoteApp := proxy.NewRemoteAppConn(proxyConn, 1024) + remoteApp.Start() - proxyAppConn.Start() + proxyAppConn = remoteApp + } // Check the hash currentHash, err := proxyAppConn.GetHashSync() diff --git a/node/node_test.go b/node/node_test.go index d171bdc17..4619de5f7 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -4,26 +4,13 @@ import ( "testing" "time" - . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" _ "github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/tendermint/types" - "github.com/tendermint/tmsp/example/golang" - "github.com/tendermint/tmsp/server" ) func TestNodeStartStop(t *testing.T) { - // Start a dummy app - go func() { - _, err := server.StartListener(config.GetString("proxy_app"), example.NewDummyApplication()) - if err != nil { - Exit(err.Error()) - } - }() - // wait for the server - time.Sleep(time.Second * 2) - // Get PrivValidator privValidatorFile := config.GetString("priv_validator_file") privValidator := types.LoadOrGenPrivValidator(privValidatorFile) diff --git a/rpc/client/http_client.go b/rpc/client/http_client.go deleted file mode 100644 index 6cc275d02..000000000 --- a/rpc/client/http_client.go +++ /dev/null @@ -1,51 +0,0 @@ -package rpcclient - -import ( - "bytes" - "encoding/json" - "errors" - "io/ioutil" - "net/http" - - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/types" -) - -func CallHTTP(remote string, method string, params []interface{}, dest interface{}) (interface{}, error) { - // Make request and get responseBytes - request := rpctypes.RPCRequest{ - JSONRPC: "2.0", - Method: method, - Params: params, - ID: "", - } - requestBytes := wire.JSONBytes(request) - requestBuf := bytes.NewBuffer(requestBytes) - log.Info(Fmt("RPC request to %v: %v", remote, string(requestBytes))) - httpResponse, err := http.Post(remote, "text/json", requestBuf) - if err != nil { - return dest, err - } - defer httpResponse.Body.Close() - responseBytes, err := ioutil.ReadAll(httpResponse.Body) - if err != nil { - return dest, err - } - log.Info(Fmt("RPC response: %v", string(responseBytes))) - - // Parse response into JSONResponse - response := rpctypes.RPCResponse{} - err = json.Unmarshal(responseBytes, &response) - if err != nil { - return dest, err - } - // Parse response into dest - resultJSONObject := response.Result - errorStr := response.Error - if errorStr != "" { - return dest, errors.New(errorStr) - } - dest = wire.ReadJSONObject(dest, resultJSONObject, &err) - return dest, err -} diff --git a/rpc/client/log.go b/rpc/client/log.go deleted file mode 100644 index 8b33e2f10..000000000 --- a/rpc/client/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package rpcclient - -import ( - "github.com/tendermint/log15" -) - -var log = log15.New("module", "rpcclient") diff --git a/rpc/client/ws_client.go b/rpc/client/ws_client.go deleted file mode 100644 index 8124b63bc..000000000 --- a/rpc/client/ws_client.go +++ /dev/null @@ -1,116 +0,0 @@ -package rpcclient - -import ( - "net/http" - "time" - - "github.com/gorilla/websocket" - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/types" -) - -const ( - wsResultsChannelCapacity = 10 - wsWriteTimeoutSeconds = 10 -) - -type WSClient struct { - QuitService - Address string - *websocket.Conn - ResultsCh chan rpctypes.Result // closes upon WSClient.Stop() -} - -// create a new connection -func NewWSClient(addr string) *WSClient { - wsClient := &WSClient{ - Address: addr, - Conn: nil, - ResultsCh: make(chan rpctypes.Result, wsResultsChannelCapacity), - } - wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient) - return wsClient -} - -func (wsc *WSClient) OnStart() error { - wsc.QuitService.OnStart() - err := wsc.dial() - if err != nil { - return err - } - go wsc.receiveEventsRoutine() - return nil -} - -func (wsc *WSClient) dial() error { - // Dial - dialer := websocket.DefaultDialer - rHeader := http.Header{} - con, _, err := dialer.Dial(wsc.Address, rHeader) - if err != nil { - return err - } - // Set the ping/pong handlers - con.SetPingHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) - return nil - }) - con.SetPongHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - return nil - }) - wsc.Conn = con - return nil -} - -func (wsc *WSClient) OnStop() { - wsc.QuitService.OnStop() - // ResultsCh is closed in receiveEventsRoutine. -} - -func (wsc *WSClient) receiveEventsRoutine() { - for { - _, data, err := wsc.ReadMessage() - if err != nil { - log.Info("WSClient failed to read message", "error", err, "data", string(data)) - wsc.Stop() - break - } else { - var response rpctypes.RPCResponse - wire.ReadJSON(&response, data, &err) - if err != nil { - log.Info("WSClient failed to parse message", "error", err) - wsc.Stop() - break - } - wsc.ResultsCh <- response.Result - } - } - - // Cleanup - close(wsc.ResultsCh) -} - -// subscribe to an event -func (wsc *WSClient) Subscribe(eventid string) error { - err := wsc.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - ID: "", - Method: "subscribe", - Params: []interface{}{eventid}, - }) - return err -} - -// unsubscribe from an event -func (wsc *WSClient) Unsubscribe(eventid string) error { - err := wsc.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - ID: "", - Method: "unsubscribe", - Params: []interface{}{eventid}, - }) - return err -} diff --git a/rpc/core/events.go b/rpc/core/events.go index 8907e0bb1..0a3edd39b 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -1,27 +1,25 @@ package core import ( + "github.com/tendermint/go-events" + "github.com/tendermint/go-rpc/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" - "github.com/tendermint/tendermint/rpc/types" "github.com/tendermint/tendermint/types" ) func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) { log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { + wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) + tmResult := ctypes.TMResult(&ctypes.ResultEvent{event, types.TMEventData(msg)}) + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &tmResult, "")) }) return &ctypes.ResultSubscribe{}, nil } func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) { log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { - // NOTE: EventSwitch callbacks must be nonblocking - // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) - }) + wsCtx.GetEventSwitch().RemoveListener(event) return &ctypes.ResultUnsubscribe{}, nil } diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 4b7433369..8e9c252d0 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -1,21 +1,111 @@ package core import ( - rpc "github.com/tendermint/tendermint/rpc/server" + rpc "github.com/tendermint/go-rpc/server" + "github.com/tendermint/go-rpc/types" + ctypes "github.com/tendermint/tendermint/rpc/core/types" ) // TODO: eliminate redundancy between here and reading code from core/ var Routes = map[string]*rpc.RPCFunc{ - "subscribe": rpc.NewWSRPCFunc(Subscribe, []string{"event"}), - "unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, []string{"event"}), - "status": rpc.NewRPCFunc(Status, []string{}), - "net_info": rpc.NewRPCFunc(NetInfo, []string{}), - "blockchain": rpc.NewRPCFunc(BlockchainInfo, []string{"minHeight", "maxHeight"}), - "genesis": rpc.NewRPCFunc(Genesis, []string{}), - "get_block": rpc.NewRPCFunc(GetBlock, []string{"height"}), - "list_validators": rpc.NewRPCFunc(ListValidators, []string{}), - "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, []string{}), - "broadcast_tx": rpc.NewRPCFunc(BroadcastTx, []string{"tx"}), - "list_unconfirmed_txs": rpc.NewRPCFunc(ListUnconfirmedTxs, []string{}), + "subscribe": rpc.NewWSRPCFunc(SubscribeResult, "event"), + "unsubscribe": rpc.NewWSRPCFunc(UnsubscribeResult, "event"), + "status": rpc.NewRPCFunc(StatusResult, ""), + "net_info": rpc.NewRPCFunc(NetInfoResult, ""), + "blockchain": rpc.NewRPCFunc(BlockchainInfoResult, "minHeight,maxHeight"), + "genesis": rpc.NewRPCFunc(GenesisResult, ""), + "get_block": rpc.NewRPCFunc(GetBlockResult, "height"), + "list_validators": rpc.NewRPCFunc(ListValidatorsResult, ""), + "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""), + "broadcast_tx": rpc.NewRPCFunc(BroadcastTxResult, "tx"), + "list_unconfirmed_txs": rpc.NewRPCFunc(ListUnconfirmedTxsResult, ""), // subscribe/unsubscribe are reserved for websocket events. } + +func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { + if r, err := Subscribe(wsCtx, event); err != nil { + return nil, err + } else { + return r, nil + } +} + +func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { + if r, err := Unsubscribe(wsCtx, event); err != nil { + return nil, err + } else { + return r, nil + } +} + +func StatusResult() (ctypes.TMResult, error) { + if r, err := Status(); err != nil { + return nil, err + } else { + return r, nil + } +} + +func NetInfoResult() (ctypes.TMResult, error) { + if r, err := NetInfo(); err != nil { + return nil, err + } else { + return r, nil + } +} + +func BlockchainInfoResult(min, max int) (ctypes.TMResult, error) { + if r, err := BlockchainInfo(min, max); err != nil { + return nil, err + } else { + return r, nil + } +} + +func GenesisResult() (ctypes.TMResult, error) { + if r, err := Genesis(); err != nil { + return nil, err + } else { + return r, nil + } +} + +func GetBlockResult(height int) (ctypes.TMResult, error) { + if r, err := GetBlock(height); err != nil { + return nil, err + } else { + return r, nil + } +} + +func ListValidatorsResult() (ctypes.TMResult, error) { + if r, err := ListValidators(); err != nil { + return nil, err + } else { + return r, nil + } +} + +func DumpConsensusStateResult() (ctypes.TMResult, error) { + if r, err := DumpConsensusState(); err != nil { + return nil, err + } else { + return r, nil + } +} + +func ListUnconfirmedTxsResult() (ctypes.TMResult, error) { + if r, err := ListUnconfirmedTxs(); err != nil { + return nil, err + } else { + return r, nil + } +} + +func BroadcastTxResult(tx []byte) (ctypes.TMResult, error) { + if r, err := BroadcastTx(tx); err != nil { + return nil, err + } else { + return r, nil + } +} diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 7ca47240d..09b7a7029 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -3,8 +3,8 @@ package core_types import ( "github.com/tendermint/go-crypto" "github.com/tendermint/go-p2p" + "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/types" "github.com/tendermint/tendermint/types" ) @@ -68,8 +68,8 @@ type ResultUnsubscribe struct { } type ResultEvent struct { - Event string `json:"event"` - Data types.EventData `json:"data"` + Name string `json:"name"` + Data types.TMEventData `json:"data"` } //---------------------------------------- @@ -90,9 +90,13 @@ const ( ResultTypeEvent = byte(0x0C) ) +type TMResult interface { + rpctypes.Result +} + // for wire.readReflect var _ = wire.RegisterInterface( - struct{ rpctypes.Result }{}, + struct{ TMResult }{}, wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis}, wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo}, wire.ConcreteType{&ResultGetBlock{}, ResultTypeGetBlock}, diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go deleted file mode 100644 index a28277f78..000000000 --- a/rpc/server/handlers.go +++ /dev/null @@ -1,553 +0,0 @@ -package rpcserver - -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "net/http" - "reflect" - "sort" - "time" - - "github.com/gorilla/websocket" - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/events" - . "github.com/tendermint/tendermint/rpc/types" -) - -func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { - // HTTP endpoints - for funcName, rpcFunc := range funcMap { - mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc)) - } - - // JSONRPC endpoints - mux.HandleFunc("/", makeJSONRPCHandler(funcMap)) -} - -//------------------------------------- -// function introspection - -// holds all type information for each function -type RPCFunc struct { - f reflect.Value // underlying rpc function - args []reflect.Type // type of each function arg - returns []reflect.Type // type of each return arg - argNames []string // name of each argument - ws bool // websocket only -} - -// wraps a function for quicker introspection -func NewRPCFunc(f interface{}, args []string) *RPCFunc { - return &RPCFunc{ - f: reflect.ValueOf(f), - args: funcArgTypes(f), - returns: funcReturnTypes(f), - argNames: args, - ws: false, - } -} - -func NewWSRPCFunc(f interface{}, args []string) *RPCFunc { - return &RPCFunc{ - f: reflect.ValueOf(f), - args: funcArgTypes(f), - returns: funcReturnTypes(f), - argNames: args, - ws: true, - } -} - -// return a function's argument types -func funcArgTypes(f interface{}) []reflect.Type { - t := reflect.TypeOf(f) - n := t.NumIn() - typez := make([]reflect.Type, n) - for i := 0; i < n; i++ { - typez[i] = t.In(i) - } - return typez -} - -// return a function's return types -func funcReturnTypes(f interface{}) []reflect.Type { - t := reflect.TypeOf(f) - n := t.NumOut() - typez := make([]reflect.Type, n) - for i := 0; i < n; i++ { - typez[i] = t.Out(i) - } - return typez -} - -// function introspection -//----------------------------------------------------------------------------- -// rpc.json - -// jsonrpc calls grab the given method's function info and runs reflect.Call -func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - b, _ := ioutil.ReadAll(r.Body) - // if its an empty request (like from a browser), - // just display a list of functions - if len(b) == 0 { - writeListOfEndpoints(w, r, funcMap) - return - } - - var request RPCRequest - err := json.Unmarshal(b, &request) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) - return - } - if len(r.URL.Path) > 1 { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path))) - return - } - rpcFunc := funcMap[request.Method] - if rpcFunc == nil { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) - return - } - if rpcFunc.ws { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method)) - return - } - args, err := jsonParamsToArgs(rpcFunc, request.Params) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error())) - return - } - returns := rpcFunc.f.Call(args) - log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error())) - return - } - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, "")) - } -} - -// Convert a list of interfaces to properly typed values -func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) { - if len(rpcFunc.argNames) != len(params) { - return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", - len(rpcFunc.argNames), rpcFunc.argNames, len(params), params)) - } - values := make([]reflect.Value, len(params)) - for i, p := range params { - ty := rpcFunc.args[i] - v, err := _jsonObjectToArg(ty, p) - if err != nil { - return nil, err - } - values[i] = v - } - return values, nil -} - -// Same as above, but with the first param the websocket connection -func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) { - if len(rpcFunc.argNames)-1 != len(params) { - return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", - len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params)) - } - values := make([]reflect.Value, len(params)+1) - values[0] = reflect.ValueOf(wsCtx) - for i, p := range params { - ty := rpcFunc.args[i+1] - v, err := _jsonObjectToArg(ty, p) - if err != nil { - return nil, err - } - values[i+1] = v - } - return values, nil -} - -func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) { - var err error - v := reflect.New(ty) - wire.ReadJSONObjectPtr(v.Interface(), object, &err) - if err != nil { - return v, err - } - v = v.Elem() - return v, nil -} - -// rpc.json -//----------------------------------------------------------------------------- -// rpc.http - -// convert from a function name to the http handler -func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) { - // Exception for websocket endpoints - if rpcFunc.ws { - return func(w http.ResponseWriter, r *http.Request) { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, "This RPC method is only for websockets")) - } - } - // All other endpoints - return func(w http.ResponseWriter, r *http.Request) { - args, err := httpParamsToArgs(rpcFunc, r) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) - return - } - returns := rpcFunc.f.Call(args) - log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) - return - } - WriteRPCResponseHTTP(w, NewRPCResponse("", result, "")) - } -} - -// Covert an http query to a list of properly typed values. -// To be properly decoded the arg must be a concrete type from tendermint (if its an interface). -func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) { - argTypes := rpcFunc.args - argNames := rpcFunc.argNames - - var err error - values := make([]reflect.Value, len(argNames)) - for i, name := range argNames { - ty := argTypes[i] - arg := GetParam(r, name) - values[i], err = _jsonStringToArg(ty, arg) - if err != nil { - return nil, err - } - } - return values, nil -} - -func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { - var err error - v := reflect.New(ty) - wire.ReadJSONPtr(v.Interface(), []byte(arg), &err) - if err != nil { - return v, err - } - v = v.Elem() - return v, nil -} - -// rpc.http -//----------------------------------------------------------------------------- -// rpc.websocket - -const ( - writeChanCapacity = 1000 - wsWriteTimeoutSeconds = 30 // each write times out after this - wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings. - wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds. -) - -// a single websocket connection -// contains listener id, underlying ws connection, -// and the event switch for subscribing to events -type wsConnection struct { - QuitService - - remoteAddr string - baseConn *websocket.Conn - writeChan chan RPCResponse - readTimeout *time.Timer - pingTicker *time.Ticker - - funcMap map[string]*RPCFunc - evsw *events.EventSwitch -} - -// new websocket connection wrapper -func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *wsConnection { - wsc := &wsConnection{ - remoteAddr: baseConn.RemoteAddr().String(), - baseConn: baseConn, - writeChan: make(chan RPCResponse, writeChanCapacity), // error when full. - funcMap: funcMap, - evsw: evsw, - } - wsc.QuitService = *NewQuitService(log, "wsConnection", wsc) - return wsc -} - -// wsc.Start() blocks until the connection closes. -func (wsc *wsConnection) OnStart() error { - wsc.QuitService.OnStart() - - // Read subscriptions/unsubscriptions to events - go wsc.readRoutine() - - // Custom Ping handler to touch readTimeout - wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds) - wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds) - wsc.baseConn.SetPingHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) - wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) - return nil - }) - wsc.baseConn.SetPongHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) - return nil - }) - go wsc.readTimeoutRoutine() - - // Write responses, BLOCKING. - wsc.writeRoutine() - return nil -} - -func (wsc *wsConnection) OnStop() { - wsc.QuitService.OnStop() - wsc.evsw.RemoveListener(wsc.remoteAddr) - wsc.readTimeout.Stop() - wsc.pingTicker.Stop() - // The write loop closes the websocket connection - // when it exits its loop, and the read loop - // closes the writeChan -} - -func (wsc *wsConnection) readTimeoutRoutine() { - select { - case <-wsc.readTimeout.C: - log.Notice("Stopping connection due to read timeout") - wsc.Stop() - case <-wsc.Quit: - return - } -} - -// Implements WSRPCConnection -func (wsc *wsConnection) GetRemoteAddr() string { - return wsc.remoteAddr -} - -// Implements WSRPCConnection -func (wsc *wsConnection) GetEventSwitch() *events.EventSwitch { - return wsc.evsw -} - -// Implements WSRPCConnection -// Blocking write to writeChan until service stops. -func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) { - select { - case <-wsc.Quit: - return - case wsc.writeChan <- resp: - } -} - -// Implements WSRPCConnection -// Nonblocking write. -func (wsc *wsConnection) TryWriteRPCResponse(resp RPCResponse) bool { - select { - case <-wsc.Quit: - return false - case wsc.writeChan <- resp: - return true - default: - return false - } -} - -// Read from the socket and subscribe to or unsubscribe from events -func (wsc *wsConnection) readRoutine() { - // Do not close writeChan, to allow WriteRPCResponse() to fail. - // defer close(wsc.writeChan) - - for { - select { - case <-wsc.Quit: - return - default: - var in []byte - // Do not set a deadline here like below: - // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds)) - // The client may not send anything for a while. - // We use `readTimeout` to handle read timeouts. - _, in, err := wsc.baseConn.ReadMessage() - if err != nil { - log.Notice("Failed to read from connection", "remote", wsc.remoteAddr) - // an error reading the connection, - // kill the connection - wsc.Stop() - return - } - var request RPCRequest - err = json.Unmarshal(in, &request) - if err != nil { - errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, errStr)) - continue - } - - // Now, fetch the RPCFunc and execute it. - - rpcFunc := wsc.funcMap[request.Method] - if rpcFunc == nil { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) - continue - } - var args []reflect.Value - if rpcFunc.ws { - wsCtx := WSRPCContext{Request: request, WSRPCConnection: wsc} - args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx) - } else { - args, err = jsonParamsToArgs(rpcFunc, request.Params) - } - if err != nil { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) - continue - } - returns := rpcFunc.f.Call(args) - log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) - continue - } else { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, result, "")) - continue - } - - } - } -} - -// receives on a write channel and writes out on the socket -func (wsc *wsConnection) writeRoutine() { - defer wsc.baseConn.Close() - var n, err = int(0), error(nil) - for { - select { - case <-wsc.Quit: - return - case <-wsc.pingTicker.C: - err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{}) - if err != nil { - log.Error("Failed to write ping message on websocket", "error", err) - wsc.Stop() - return - } - case msg := <-wsc.writeChan: - buf := new(bytes.Buffer) - wire.WriteJSON(msg, buf, &n, &err) - if err != nil { - log.Error("Failed to marshal RPCResponse to JSON", "error", err) - } else { - wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) - bufBytes := buf.Bytes() - if err = wsc.baseConn.WriteMessage(websocket.TextMessage, bufBytes); err != nil { - log.Warn("Failed to write response on websocket", "error", err) - wsc.Stop() - return - } - } - } - } -} - -//---------------------------------------- - -// Main manager for all websocket connections -// Holds the event switch -// NOTE: The websocket path is defined externally, e.g. in node/node.go -type WebsocketManager struct { - websocket.Upgrader - funcMap map[string]*RPCFunc - evsw *events.EventSwitch -} - -func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WebsocketManager { - return &WebsocketManager{ - funcMap: funcMap, - evsw: evsw, - Upgrader: websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - // TODO - return true - }, - }, - } -} - -// Upgrade the request/response (via http.Hijack) and starts the wsConnection. -func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { - wsConn, err := wm.Upgrade(w, r, nil) - if err != nil { - // TODO - return http error - log.Error("Failed to upgrade to websocket connection", "error", err) - return - } - - // register connection - con := NewWSConnection(wsConn, wm.funcMap, wm.evsw) - log.Notice("New websocket connection", "remote", con.remoteAddr) - con.Start() // Blocking -} - -// rpc.websocket -//----------------------------------------------------------------------------- - -// returns is result struct and error. If error is not nil, return it -func unreflectResult(returns []reflect.Value) (interface{}, error) { - errV := returns[1] - if errV.Interface() != nil { - return nil, fmt.Errorf("%v", errV.Interface()) - } - return returns[0].Interface(), nil -} - -// writes a list of available rpc endpoints as an html page -func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) { - noArgNames := []string{} - argNames := []string{} - for name, funcData := range funcMap { - if len(funcData.args) == 0 { - noArgNames = append(noArgNames, name) - } else { - argNames = append(argNames, name) - } - } - sort.Strings(noArgNames) - sort.Strings(argNames) - buf := new(bytes.Buffer) - buf.WriteString("") - buf.WriteString("
Available endpoints:
") - - for _, name := range noArgNames { - link := fmt.Sprintf("http://%s/%s", r.Host, name) - buf.WriteString(fmt.Sprintf("%s
", link, link)) - } - - buf.WriteString("
Endpoints that require arguments:
") - for _, name := range argNames { - link := fmt.Sprintf("http://%s/%s?", r.Host, name) - funcData := funcMap[name] - for i, argName := range funcData.argNames { - link += argName + "=_" - if i < len(funcData.argNames)-1 { - link += "&" - } - } - buf.WriteString(fmt.Sprintf("%s
", link, link)) - } - buf.WriteString("") - w.Header().Set("Content-Type", "text/html") - w.WriteHeader(200) - w.Write(buf.Bytes()) -} diff --git a/rpc/server/http_params.go b/rpc/server/http_params.go deleted file mode 100644 index acf5b4c8c..000000000 --- a/rpc/server/http_params.go +++ /dev/null @@ -1,89 +0,0 @@ -package rpcserver - -import ( - "encoding/hex" - "fmt" - "net/http" - "regexp" - "strconv" -) - -var ( - // Parts of regular expressions - atom = "[A-Z0-9!#$%&'*+\\-/=?^_`{|}~]+" - dotAtom = atom + `(?:\.` + atom + `)*` - domain = `[A-Z0-9.-]+\.[A-Z]{2,4}` - - RE_HEX = regexp.MustCompile(`^(?i)[a-f0-9]+$`) - RE_EMAIL = regexp.MustCompile(`^(?i)(` + dotAtom + `)@(` + dotAtom + `)$`) - RE_ADDRESS = regexp.MustCompile(`^(?i)[a-z0-9]{25,34}$`) - RE_HOST = regexp.MustCompile(`^(?i)(` + domain + `)$`) - - //RE_ID12 = regexp.MustCompile(`^[a-zA-Z0-9]{12}$`) -) - -func GetParam(r *http.Request, param string) string { - s := r.URL.Query().Get(param) - if s == "" { - s = r.FormValue(param) - } - return s -} - -func GetParamByteSlice(r *http.Request, param string) ([]byte, error) { - s := GetParam(r, param) - return hex.DecodeString(s) -} - -func GetParamInt64(r *http.Request, param string) (int64, error) { - s := GetParam(r, param) - i, err := strconv.ParseInt(s, 10, 64) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return i, nil -} - -func GetParamInt32(r *http.Request, param string) (int32, error) { - s := GetParam(r, param) - i, err := strconv.ParseInt(s, 10, 32) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return int32(i), nil -} - -func GetParamUint64(r *http.Request, param string) (uint64, error) { - s := GetParam(r, param) - i, err := strconv.ParseUint(s, 10, 64) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return i, nil -} - -func GetParamUint(r *http.Request, param string) (uint, error) { - s := GetParam(r, param) - i, err := strconv.ParseUint(s, 10, 64) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return uint(i), nil -} - -func GetParamRegexp(r *http.Request, param string, re *regexp.Regexp) (string, error) { - s := GetParam(r, param) - if !re.MatchString(s) { - return "", fmt.Errorf(param, "Did not match regular expression %v", re.String()) - } - return s, nil -} - -func GetParamFloat64(r *http.Request, param string) (float64, error) { - s := GetParam(r, param) - f, err := strconv.ParseFloat(s, 64) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return f, nil -} diff --git a/rpc/server/http_server.go b/rpc/server/http_server.go deleted file mode 100644 index fd35d0df6..000000000 --- a/rpc/server/http_server.go +++ /dev/null @@ -1,115 +0,0 @@ -// Commons for HTTP handling -package rpcserver - -import ( - "bufio" - "fmt" - "net" - "net/http" - "runtime/debug" - "time" - - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/alert" - . "github.com/tendermint/tendermint/rpc/types" -) - -func StartHTTPServer(listenAddr string, handler http.Handler) (net.Listener, error) { - log.Notice(Fmt("Starting RPC HTTP server on %v", listenAddr)) - listener, err := net.Listen("tcp", listenAddr) - if err != nil { - return nil, fmt.Errorf("Failed to listen to %v", listenAddr) - } - go func() { - res := http.Serve( - listener, - RecoverAndLogHandler(handler), - ) - log.Crit("RPC HTTP server stopped", "result", res) - }() - return listener, nil -} - -func WriteRPCResponseHTTP(w http.ResponseWriter, res RPCResponse) { - jsonBytes := wire.JSONBytesPretty(res) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(200) - w.Write(jsonBytes) -} - -//----------------------------------------------------------------------------- - -// Wraps an HTTP handler, adding error logging. -// If the inner function panics, the outer function recovers, logs, sends an -// HTTP 500 error response. -func RecoverAndLogHandler(handler http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Wrap the ResponseWriter to remember the status - rww := &ResponseWriterWrapper{-1, w} - begin := time.Now() - - // Common headers - origin := r.Header.Get("Origin") - rww.Header().Set("Access-Control-Allow-Origin", origin) - rww.Header().Set("Access-Control-Allow-Credentials", "true") - rww.Header().Set("Access-Control-Expose-Headers", "X-Server-Time") - rww.Header().Set("X-Server-Time", fmt.Sprintf("%v", begin.Unix())) - - defer func() { - // Send a 500 error if a panic happens during a handler. - // Without this, Chrome & Firefox were retrying aborted ajax requests, - // at least to my localhost. - if e := recover(); e != nil { - - // If RPCResponse - if res, ok := e.(RPCResponse); ok { - WriteRPCResponseHTTP(rww, res) - } else { - // For the rest, - log.Error("Panic in RPC HTTP handler", "error", e, "stack", string(debug.Stack())) - rww.WriteHeader(http.StatusInternalServerError) - WriteRPCResponseHTTP(rww, NewRPCResponse("", nil, Fmt("Internal Server Error: %v", e))) - } - } - - // Finally, log. - durationMS := time.Since(begin).Nanoseconds() / 1000000 - if rww.Status == -1 { - rww.Status = 200 - } - log.Info("Served RPC HTTP response", - "method", r.Method, "url", r.URL, - "status", rww.Status, "duration", durationMS, - "remoteAddr", r.RemoteAddr, - ) - }() - - handler.ServeHTTP(rww, r) - }) -} - -// Remember the status for logging -type ResponseWriterWrapper struct { - Status int - http.ResponseWriter -} - -func (w *ResponseWriterWrapper) WriteHeader(status int) { - w.Status = status - w.ResponseWriter.WriteHeader(status) -} - -// implements http.Hijacker -func (w *ResponseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) { - return w.ResponseWriter.(http.Hijacker).Hijack() -} - -// Stick it as a deferred statement in gouroutines to prevent the program from crashing. -func Recover(daemonName string) { - if e := recover(); e != nil { - stack := string(debug.Stack()) - errorString := fmt.Sprintf("[%s] %s\n%s", daemonName, e, stack) - alert.Alert(errorString) - } -} diff --git a/rpc/server/log.go b/rpc/server/log.go deleted file mode 100644 index 704e22e30..000000000 --- a/rpc/server/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package rpcserver - -import ( - "github.com/tendermint/log15" -) - -var log = log15.New("module", "rpcserver") diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go new file mode 100644 index 000000000..64f557586 --- /dev/null +++ b/rpc/test/client_test.go @@ -0,0 +1,162 @@ +package rpctest + +import ( + "fmt" + "testing" + + _ "github.com/tendermint/tendermint/config/tendermint_test" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/types" +) + +//-------------------------------------------------------------------------------- +// Test the HTTP client + +func TestURIStatus(t *testing.T) { + tmResult := new(ctypes.TMResult) + _, err := clientURI.Call("status", map[string]interface{}{}, tmResult) + if err != nil { + t.Fatal(err) + } + testStatus(t, tmResult) +} + +func TestJSONStatus(t *testing.T) { + tmResult := new(ctypes.TMResult) + _, err := clientJSON.Call("status", []interface{}{}, tmResult) + if err != nil { + t.Fatal(err) + } + testStatus(t, tmResult) +} + +func testStatus(t *testing.T, statusI interface{}) { + tmRes := statusI.(*ctypes.TMResult) + status := (*tmRes).(*ctypes.ResultStatus) + if status.NodeInfo.Network != chainID { + t.Fatal(fmt.Errorf("ChainID mismatch: got %s expected %s", + status.NodeInfo.Network, chainID)) + } +} + +/*func TestURIBroadcastTx(t *testing.T) { + testBroadcastTx(t, "HTTP") +}*/ + +/*func TestJSONBroadcastTx(t *testing.T) { + testBroadcastTx(t, "JSONRPC") +}*/ + +// TODO +/* +func testBroadcastTx(t *testing.T, typ string) { + amt := int64(100) + toAddr := user[1].Address + tx := makeDefaultSendTxSigned(t, typ, toAddr, amt) + receipt := broadcastTx(t, typ, tx) + if receipt.CreatesContract > 0 { + t.Fatal("This tx does not create a contract") + } + if len(receipt.TxHash) == 0 { + t.Fatal("Failed to compute tx hash") + } + pool := node.MempoolReactor().Mempool + txs := pool.GetProposalTxs() + if len(txs) != mempoolCount { + t.Fatalf("The mem pool has %d txs. Expected %d", len(txs), mempoolCount) + } + tx2 := txs[mempoolCount-1].(*types.SendTx) + n, err := new(int64), new(error) + buf1, buf2 := new(bytes.Buffer), new(bytes.Buffer) + tx.WriteSignBytes(chainID, buf1, n, err) + tx2.WriteSignBytes(chainID, buf2, n, err) + if bytes.Compare(buf1.Bytes(), buf2.Bytes()) != 0 { + t.Fatal("inconsistent hashes for mempool tx and sent tx") + } +}*/ + +//-------------------------------------------------------------------------------- +// Test the websocket service + +var wsTyp = "JSONRPC" + +// make a simple connection to the server +func TestWSConnect(t *testing.T) { + wsc := newWSClient(t) + wsc.Stop() +} + +// receive a new block message +func TestWSNewBlock(t *testing.T) { + wsc := newWSClient(t) + eid := types.EventStringNewBlock() + subscribe(t, wsc, eid) + defer func() { + unsubscribe(t, wsc, eid) + wsc.Stop() + }() + waitForEvent(t, wsc, eid, true, func() {}, func(eid string, b interface{}) error { + fmt.Println("Check:", b) + return nil + }) +} + +// receive a few new block messages in a row, with increasing height +func TestWSBlockchainGrowth(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + wsc := newWSClient(t) + eid := types.EventStringNewBlock() + subscribe(t, wsc, eid) + defer func() { + unsubscribe(t, wsc, eid) + wsc.Stop() + }() + + // listen for NewBlock, ensure height increases by 1 + + var initBlockN int + for i := 0; i < 3; i++ { + waitForEvent(t, wsc, eid, true, func() {}, func(eid string, eventData interface{}) error { + block := eventData.(types.EventDataNewBlock).Block + if i == 0 { + initBlockN = block.Header.Height + } else { + if block.Header.Height != initBlockN+i { + return fmt.Errorf("Expected block %d, got block %d", initBlockN+i, block.Header.Height) + } + } + + return nil + }) + } +} + +/* TODO: this with dummy app.. +func TestWSDoubleFire(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + con := newWSCon(t) + eid := types.EventStringAccInput(user[0].Address) + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + amt := int64(100) + toAddr := user[1].Address + // broadcast the transaction, wait to hear about it + waitForEvent(t, con, eid, true, func() { + tx := makeDefaultSendTxSigned(t, wsTyp, toAddr, amt) + broadcastTx(t, wsTyp, tx) + }, func(eid string, b []byte) error { + return nil + }) + // but make sure we don't hear about it twice + waitForEvent(t, con, eid, false, func() { + }, func(eid string, b []byte) error { + return nil + }) +}*/ diff --git a/rpc/test/config.go b/rpc/test/config.go new file mode 100644 index 000000000..787690974 --- /dev/null +++ b/rpc/test/config.go @@ -0,0 +1,18 @@ +package rpctest + +import ( + cfg "github.com/tendermint/tendermint/config" + tmcfg "github.com/tendermint/tendermint/config/tendermint_test" +) + +var config cfg.Config = nil + +func initConfig() { + + cfg.OnConfig(func(newConfig cfg.Config) { + config = newConfig + }) + + c := tmcfg.GetConfig("") + cfg.ApplyConfig(c) // Notify modules of new config +} diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go new file mode 100644 index 000000000..21de52fa9 --- /dev/null +++ b/rpc/test/helpers.go @@ -0,0 +1,162 @@ +package rpctest + +import ( + "testing" + "time" + + . "github.com/tendermint/go-common" + "github.com/tendermint/go-p2p" + "github.com/tendermint/go-wire" + + client "github.com/tendermint/go-rpc/client" + _ "github.com/tendermint/tendermint/config/tendermint_test" + nm "github.com/tendermint/tendermint/node" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/types" +) + +// global variables for use across all tests +var ( + node *nm.Node + + mempoolCount = 0 + + chainID string + + rpcAddr, requestAddr, websocketAddr string + + clientURI *client.ClientURI + clientJSON *client.ClientJSONRPC +) + +// initialize config and create new node +func init() { + initConfig() + + chainID = config.GetString("chain_id") + rpcAddr = config.GetString("rpc_laddr") + requestAddr = "http://" + rpcAddr + websocketAddr = "ws://" + rpcAddr + "/websocket" + + clientURI = client.NewClientURI(requestAddr) + clientJSON = client.NewClientJSONRPC(requestAddr) + + // TODO: change consensus/state.go timeouts to be shorter + + // start a node + ready := make(chan struct{}) + go newNode(ready) + <-ready +} + +// create a new node and sleep forever +func newNode(ready chan struct{}) { + // Create & start node + privValidatorFile := config.GetString("priv_validator_file") + privValidator := types.LoadOrGenPrivValidator(privValidatorFile) + node = nm.NewNode(privValidator) + l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), true) + node.AddListener(l) + node.Start() + + // Run the RPC server. + node.StartRPC() + ready <- struct{}{} + + // Sleep forever + ch := make(chan struct{}) + <-ch +} + +//-------------------------------------------------------------------------------- +// Utilities for testing the websocket service + +// create a new connection +func newWSClient(t *testing.T) *client.WSClient { + wsc := client.NewWSClient(websocketAddr) + if _, err := wsc.Start(); err != nil { + t.Fatal(err) + } + return wsc +} + +// subscribe to an event +func subscribe(t *testing.T, wsc *client.WSClient, eventid string) { + if err := wsc.Subscribe(eventid); err != nil { + t.Fatal(err) + } +} + +// unsubscribe from an event +func unsubscribe(t *testing.T, wsc *client.WSClient, eventid string) { + if err := wsc.Unsubscribe(eventid); err != nil { + t.Fatal(err) + } +} + +// wait for an event; do things that might trigger events, and check them when they are received +// the check function takes an event id and the byte slice read off the ws +func waitForEvent(t *testing.T, wsc *client.WSClient, eventid string, dieOnTimeout bool, f func(), check func(string, interface{}) error) { + // go routine to wait for webscoket msg + goodCh := make(chan interface{}) + errCh := make(chan error) + + // Read message + go func() { + var err error + LOOP: + for { + select { + case r := <-wsc.ResultsCh: + result := new(ctypes.TMResult) + wire.ReadJSONPtr(result, r, &err) + if err != nil { + errCh <- err + break LOOP + } + event, ok := (*result).(*ctypes.ResultEvent) + if ok && event.Name == eventid { + goodCh <- event.Data + break LOOP + } + case err := <-wsc.ErrorsCh: + errCh <- err + break LOOP + case <-wsc.Quit: + break LOOP + } + } + }() + + // do stuff (transactions) + f() + + // wait for an event or timeout + timeout := time.NewTimer(10 * time.Second) + select { + case <-timeout.C: + if dieOnTimeout { + wsc.Stop() + t.Fatalf("%s event was not received in time", eventid) + } + // else that's great, we didn't hear the event + // and we shouldn't have + case eventData := <-goodCh: + if dieOnTimeout { + // message was received and expected + // run the check + if err := check(eventid, eventData); err != nil { + t.Fatal(err) // Show the stack trace. + } + } else { + wsc.Stop() + t.Fatalf("%s event was not expected", eventid) + } + case err := <-errCh: + t.Fatal(err) + panic(err) // Show the stack trace. + + } +} + +//-------------------------------------------------------------------------------- diff --git a/rpc/types/types.go b/rpc/types/types.go deleted file mode 100644 index f7bc98d50..000000000 --- a/rpc/types/types.go +++ /dev/null @@ -1,71 +0,0 @@ -package rpctypes - -import ( - "github.com/tendermint/tendermint/events" -) - -type RPCRequest struct { - JSONRPC string `json:"jsonrpc"` - ID string `json:"id"` - Method string `json:"method"` - Params []interface{} `json:"params"` -} - -func NewRPCRequest(id string, method string, params []interface{}) RPCRequest { - return RPCRequest{ - JSONRPC: "2.0", - ID: id, - Method: method, - Params: params, - } -} - -//---------------------------------------- - -/* -Result is a generic interface. -Applications should register type-bytes like so: - -var _ = wire.RegisterInterface( - struct{ Result }{}, - wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis}, - wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo}, - ... -) -*/ -type Result interface { -} - -//---------------------------------------- - -type RPCResponse struct { - JSONRPC string `json:"jsonrpc"` - ID string `json:"id"` - Result Result `json:"result"` - Error string `json:"error"` -} - -func NewRPCResponse(id string, res Result, err string) RPCResponse { - return RPCResponse{ - JSONRPC: "2.0", - ID: id, - Result: res, - Error: err, - } -} - -//---------------------------------------- - -// *wsConnection implements this interface. -type WSRPCConnection interface { - GetRemoteAddr() string - GetEventSwitch() *events.EventSwitch - WriteRPCResponse(resp RPCResponse) - TryWriteRPCResponse(resp RPCResponse) bool -} - -// websocket-only RPCFuncs take this as the first parameter. -type WSRPCContext struct { - Request RPCRequest - WSRPCConnection -} diff --git a/rpc/version.go b/rpc/version.go deleted file mode 100644 index 2982824dd..000000000 --- a/rpc/version.go +++ /dev/null @@ -1,3 +0,0 @@ -package rpc - -const Version = "0.4.0" diff --git a/state/execution.go b/state/execution.go index 1578af4b3..37e3426b0 100644 --- a/state/execution.go +++ b/state/execution.go @@ -5,7 +5,7 @@ import ( "fmt" . "github.com/tendermint/go-common" - "github.com/tendermint/tendermint/events" + "github.com/tendermint/go-events" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" tmsp "github.com/tendermint/tmsp/types" @@ -90,7 +90,7 @@ func (s *State) execBlockOnProxyApp(evsw *events.EventSwitch, proxyAppConn proxy log.Warn("Error computing proxyAppConn hash", "error", err) return err } - log.Info("ExecBlock got %v valid txs and %v invalid txs", validTxs, invalidTxs) + log.Info(Fmt("ExecBlock got %v valid txs and %v invalid txs", validTxs, invalidTxs)) // Set the state's new AppHash s.AppHash = hash diff --git a/types/events.go b/types/events.go index 436e02de6..a316cfc54 100644 --- a/types/events.go +++ b/types/events.go @@ -1,6 +1,8 @@ package types import ( + // for registering TMEventData as events.EventData + "github.com/tendermint/go-events" "github.com/tendermint/go-wire" ) @@ -28,6 +30,12 @@ func EventStringApp() string { return "App" } //---------------------------------------- +// implements events.EventData +type TMEventData interface { + events.EventData + // AssertIsTMEventData() +} + const ( EventDataTypeNewBlock = byte(0x01) EventDataTypeFork = byte(0x02) @@ -38,12 +46,8 @@ const ( EventDataTypeVote = byte(0x12) ) -type EventData interface { - AssertIsEventData() -} - var _ = wire.RegisterInterface( - struct{ EventData }{}, + struct{ TMEventData }{}, wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock}, // wire.ConcreteType{EventDataFork{}, EventDataTypeFork }, wire.ConcreteType{EventDataTx{}, EventDataTypeTx}, @@ -77,15 +81,7 @@ type EventDataRoundState struct { Step string `json:"step"` // private, not exposed to websockets - rs interface{} -} - -func (edrs *EventDataRoundState) RoundState() interface{} { - return edrs.rs -} - -func (edrs *EventDataRoundState) SetRoundState(rs interface{}) { - edrs.rs = rs + RoundState interface{} `json:"-"` } type EventDataVote struct { @@ -94,8 +90,8 @@ type EventDataVote struct { Vote *Vote } -func (_ EventDataNewBlock) AssertIsEventData() {} -func (_ EventDataTx) AssertIsEventData() {} -func (_ EventDataApp) AssertIsEventData() {} -func (_ EventDataRoundState) AssertIsEventData() {} -func (_ EventDataVote) AssertIsEventData() {} +func (_ EventDataNewBlock) AssertIsTMEventData() {} +func (_ EventDataTx) AssertIsTMEventData() {} +func (_ EventDataApp) AssertIsTMEventData() {} +func (_ EventDataRoundState) AssertIsTMEventData() {} +func (_ EventDataVote) AssertIsTMEventData() {}