From 0be13d1d2774eacdfe469746f22889cfa6b68317 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 12 Jan 2016 16:54:27 -0500 Subject: [PATCH] move alert, events, rpc into own repos --- alert/alert.go | 64 ----- alert/config.go | 14 - alert/email.go | 176 ------------ alert/log.go | 7 - benchmarks/simu/counter.go | 4 +- blockchain/reactor.go | 2 +- consensus/common_test.go | 2 +- consensus/reactor.go | 6 +- consensus/state.go | 2 +- consensus/state_test.go | 2 +- events/event_cache.go | 45 --- events/events.go | 215 -------------- events/log.go | 7 - mempool/reactor.go | 2 +- node/node.go | 12 +- node/node_test.go | 18 -- rpc/client/http_client.go | 133 --------- rpc/client/log.go | 7 - rpc/client/ws_client.go | 116 -------- rpc/core/events.go | 8 +- rpc/core/routes.go | 2 +- rpc/core/types/responses.go | 8 +- rpc/server/handlers.go | 553 ------------------------------------ rpc/server/http_params.go | 89 ------ rpc/server/http_server.go | 115 -------- rpc/server/log.go | 7 - rpc/test/helpers.go | 49 ++-- rpc/types/types.go | 71 ----- rpc/version.go | 3 - state/state.go | 1 + types/events.go | 8 +- 31 files changed, 57 insertions(+), 1691 deletions(-) delete mode 100644 alert/alert.go delete mode 100644 alert/config.go delete mode 100644 alert/email.go delete mode 100644 alert/log.go delete mode 100644 events/event_cache.go delete mode 100644 events/events.go delete mode 100644 events/log.go delete mode 100644 rpc/client/http_client.go delete mode 100644 rpc/client/log.go delete mode 100644 rpc/client/ws_client.go delete mode 100644 rpc/server/handlers.go delete mode 100644 rpc/server/http_params.go delete mode 100644 rpc/server/http_server.go delete mode 100644 rpc/server/log.go delete mode 100644 rpc/types/types.go delete mode 100644 rpc/version.go diff --git a/alert/alert.go b/alert/alert.go deleted file mode 100644 index c47633091..000000000 --- a/alert/alert.go +++ /dev/null @@ -1,64 +0,0 @@ -package alert - -import ( - "fmt" - "time" - - "github.com/sfreiberg/gotwilio" -) - -var lastAlertUnix int64 = 0 -var alertCountSince int = 0 - -// Sends a critical alert message to administrators. -func Alert(message string) { - log.Error(" ALERT \n" + message) - now := time.Now().Unix() - if now-lastAlertUnix > int64(config.GetInt("alert_min_interval")) { - message = fmt.Sprintf("%v:%v", config.GetString("chain_id"), message) - if alertCountSince > 0 { - message = fmt.Sprintf("%v (+%v more since)", message, alertCountSince) - alertCountSince = 0 - } - if len(config.GetString("alert_twilio_sid")) > 0 { - go sendTwilio(message) - } - if len(config.GetString("alert_email_recipients")) > 0 { - go sendEmail(message) - } - } else { - alertCountSince++ - } -} - -func sendTwilio(message string) { - defer func() { - if err := recover(); err != nil { - log.Error("sendTwilio error", "error", err) - } - }() - if len(message) > 50 { - message = message[:50] - } - twilio := gotwilio.NewTwilioClient(config.GetString("alert_twilio_sid"), config.GetString("alert_twilio_token")) - res, exp, err := twilio.SendSMS(config.GetString("alert_twilio_from"), config.GetString("alert_twilio_to"), message, "", "") - if exp != nil || err != nil { - log.Error("sendTwilio error", "res", res, "exp", exp, "error", err) - } -} - -func sendEmail(message string) { - defer func() { - if err := recover(); err != nil { - log.Error("sendEmail error", "error", err) - } - }() - subject := message - if len(subject) > 80 { - subject = subject[:80] - } - err := SendEmail(subject, message, config.GetStringSlice("alert_email_recipients")) - if err != nil { - log.Error("sendEmail error", "error", err, "message", message) - } -} diff --git a/alert/config.go b/alert/config.go deleted file mode 100644 index 4a7bf7714..000000000 --- a/alert/config.go +++ /dev/null @@ -1,14 +0,0 @@ - -package alert - -import ( - cfg "github.com/tendermint/go-config" -) - -var config cfg.Config = nil - -func init() { - cfg.OnConfig(func(newConfig cfg.Config) { - config = newConfig - }) -} diff --git a/alert/email.go b/alert/email.go deleted file mode 100644 index c183f1d58..000000000 --- a/alert/email.go +++ /dev/null @@ -1,176 +0,0 @@ -// Forked from github.com/SlyMarbo/gmail -package alert - -import ( - "bytes" - "crypto/tls" - "encoding/base64" - "errors" - "fmt" - "io/ioutil" - "net/smtp" - "path/filepath" - "regexp" - "strings" -) - -// Convenience function -func SendEmail(subject, body string, tos []string) error { - email := Compose(subject, body) - email.From = config.GetString("smtp_user") - email.ContentType = "text/html; charset=utf-8" - email.AddRecipients(tos...) - err := email.Send() - return err -} - -// Email represents a single message, which may contain -// attachments. -type Email struct { - From string - To []string - Subject string - ContentType string - Body string - Attachments map[string][]byte -} - -// Compose begins a new email, filling the subject and body, -// and allocating memory for the list of recipients and the -// attachments. -func Compose(Subject, Body string) *Email { - out := new(Email) - out.To = make([]string, 0, 1) - out.Subject = Subject - out.Body = Body - out.Attachments = make(map[string][]byte) - return out -} - -// Attach takes a filename and adds this to the message. -// Note that since only the filename is stored (and not -// its path, for privacy reasons), multiple files in -// different directories but with the same filename and -// extension cannot be sent. -func (e *Email) Attach(Filename string) error { - b, err := ioutil.ReadFile(Filename) - if err != nil { - return err - } - - _, fname := filepath.Split(Filename) - e.Attachments[fname] = b - return nil -} - -// AddRecipient adds a single recipient. -func (e *Email) AddRecipient(Recipient string) { - e.To = append(e.To, Recipient) -} - -// AddRecipients adds one or more recipients. -func (e *Email) AddRecipients(Recipients ...string) { - e.To = append(e.To, Recipients...) -} - -// Send sends the email, returning any error encountered. -func (e *Email) Send() error { - if e.From == "" { - return errors.New("Error: No sender specified. Please set the Email.From field.") - } - if e.To == nil || len(e.To) == 0 { - return errors.New("Error: No recipient specified. Please set the Email.To field.") - } - - auth := smtp.PlainAuth( - "", - config.GetString("smtp_user"), - config.GetString("smtp_password"), - config.GetString("smtp_host"), - ) - - conn, err := smtp.Dial(fmt.Sprintf("%v:%v", config.GetString("smtp_host"), config.GetString("smtp_port"))) - if err != nil { - return err - } - - err = conn.StartTLS(&tls.Config{}) - if err != nil { - return err - } - - err = conn.Auth(auth) - if err != nil { - return err - } - - err = conn.Mail(e.From) - if err != nil { - if strings.Contains(err.Error(), "530 5.5.1") { - return errors.New("Error: Authentication failure. Your username or password is incorrect.") - } - return err - } - - for _, recipient := range e.To { - err = conn.Rcpt(recipient) - if err != nil { - return err - } - } - - wc, err := conn.Data() - if err != nil { - return err - } - defer wc.Close() - _, err = wc.Write(e.Bytes()) - if err != nil { - return err - } - - return nil -} - -func (e *Email) Bytes() []byte { - buf := bytes.NewBuffer(nil) - - var subject = e.Subject - subject = regexp.MustCompile("\n+").ReplaceAllString(subject, " ") - subject = regexp.MustCompile(" +").ReplaceAllString(subject, " ") - buf.WriteString("Subject: " + subject + "\n") - buf.WriteString("To: <" + strings.Join(e.To, ">,<") + ">\n") - buf.WriteString("MIME-Version: 1.0\n") - - // Boundary is used by MIME to separate files. - boundary := "f46d043c813270fc6b04c2d223da" - - if len(e.Attachments) > 0 { - buf.WriteString("Content-Type: multipart/mixed; boundary=" + boundary + "\n") - buf.WriteString("--" + boundary + "\n") - } - - if e.ContentType == "" { - e.ContentType = "text/plain; charset=utf-8" - } - buf.WriteString(fmt.Sprintf("Content-Type: %s\n\n", e.ContentType)) - buf.WriteString(e.Body) - - if len(e.Attachments) > 0 { - for k, v := range e.Attachments { - buf.WriteString("\n\n--" + boundary + "\n") - buf.WriteString("Content-Type: application/octet-stream\n") - buf.WriteString("Content-Transfer-Encoding: base64\n") - buf.WriteString("Content-Disposition: attachment; filename=\"" + k + "\"\n\n") - - b := make([]byte, base64.StdEncoding.EncodedLen(len(v))) - base64.StdEncoding.Encode(b, v) - buf.Write(b) - buf.WriteString("\n--" + boundary) - } - - buf.WriteString("--") - } - - return buf.Bytes() -} diff --git a/alert/log.go b/alert/log.go deleted file mode 100644 index 58055cf23..000000000 --- a/alert/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package alert - -import ( - "github.com/tendermint/go-logger" -) - -var log = logger.New("module", "alert") diff --git a/benchmarks/simu/counter.go b/benchmarks/simu/counter.go index 307f29d06..44e00a0c9 100644 --- a/benchmarks/simu/counter.go +++ b/benchmarks/simu/counter.go @@ -8,10 +8,10 @@ import ( "github.com/gorilla/websocket" . "github.com/tendermint/go-common" + "github.com/tendermint/go-rpc/client" + "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/client" _ "github.com/tendermint/tendermint/rpc/core/types" // Register RPCResponse > Result types - "github.com/tendermint/tendermint/rpc/types" ) func main() { diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 7bcd1cb88..06168903f 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -10,7 +10,7 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/events" + "github.com/tendermint/go-events" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" diff --git a/consensus/common_test.go b/consensus/common_test.go index 9f7bc6695..7effed689 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -11,7 +11,7 @@ import ( dbm "github.com/tendermint/go-db" bc "github.com/tendermint/tendermint/blockchain" _ "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/tendermint/events" + "github.com/tendermint/go-events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" diff --git a/consensus/reactor.go b/consensus/reactor.go index f0e34cc48..d9f36d929 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -9,10 +9,10 @@ import ( "time" . "github.com/tendermint/go-common" + "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" - "github.com/tendermint/tendermint/events" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -234,12 +234,12 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { // broadcasting the result to peers func (conR *ConsensusReactor) registerEventCallbacks() { - conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) { + conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data events.EventData) { rs := data.(*types.EventDataRoundState).RoundState().(*RoundState) conR.broadcastNewRoundStep(rs) }) - conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { + conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) { edv := data.(*types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote, edv.Index) }) diff --git a/consensus/state.go b/consensus/state.go index dd7ec5550..8cee3dbd6 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -9,9 +9,9 @@ import ( "time" . "github.com/tendermint/go-common" + "github.com/tendermint/go-events" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" - "github.com/tendermint/tendermint/events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" diff --git a/consensus/state_test.go b/consensus/state_test.go index fbe6a3db5..da8d4c903 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -7,7 +7,7 @@ import ( "time" _ "github.com/tendermint/tendermint/config/tendermint_test" - //"github.com/tendermint/tendermint/events" + //"github.com/tendermint/go-events" "github.com/tendermint/tendermint/types" ) diff --git a/events/event_cache.go b/events/event_cache.go deleted file mode 100644 index d0109ae11..000000000 --- a/events/event_cache.go +++ /dev/null @@ -1,45 +0,0 @@ -package events - -import ( - "github.com/tendermint/tendermint/types" -) - -const ( - eventsBufferSize = 1000 -) - -// An EventCache buffers events for a Fireable -// All events are cached. Filtering happens on Flush -type EventCache struct { - evsw Fireable - events []eventInfo -} - -// Create a new EventCache with an EventSwitch as backend -func NewEventCache(evsw Fireable) *EventCache { - return &EventCache{ - evsw: evsw, - events: make([]eventInfo, eventsBufferSize), - } -} - -// a cached event -type eventInfo struct { - event string - data types.EventData -} - -// Cache an event to be fired upon finality. -func (evc *EventCache) FireEvent(event string, data types.EventData) { - // append to list - evc.events = append(evc.events, eventInfo{event, data}) -} - -// Fire events by running evsw.FireEvent on all cached events. Blocks. -// Clears cached events -func (evc *EventCache) Flush() { - for _, ei := range evc.events { - evc.evsw.FireEvent(ei.event, ei.data) - } - evc.events = make([]eventInfo, eventsBufferSize) -} diff --git a/events/events.go b/events/events.go deleted file mode 100644 index 647522d0a..000000000 --- a/events/events.go +++ /dev/null @@ -1,215 +0,0 @@ -package events - -import ( - "sync" - - . "github.com/tendermint/go-common" - "github.com/tendermint/tendermint/types" -) - -// reactors and other modules should export -// this interface to become eventable -type Eventable interface { - SetEventSwitch(evsw *EventSwitch) -} - -// an event switch or cache implements fireable -type Fireable interface { - FireEvent(event string, data types.EventData) -} - -type EventSwitch struct { - BaseService - - mtx sync.RWMutex - eventCells map[string]*eventCell - listeners map[string]*eventListener -} - -func NewEventSwitch() *EventSwitch { - evsw := &EventSwitch{} - evsw.BaseService = *NewBaseService(log, "EventSwitch", evsw) - return evsw -} - -func (evsw *EventSwitch) OnStart() error { - evsw.BaseService.OnStart() - evsw.eventCells = make(map[string]*eventCell) - evsw.listeners = make(map[string]*eventListener) - return nil -} - -func (evsw *EventSwitch) OnStop() { - evsw.BaseService.OnStop() - evsw.eventCells = nil - evsw.listeners = nil -} - -func (evsw *EventSwitch) AddListenerForEvent(listenerID, event string, cb eventCallback) { - // Get/Create eventCell and listener - evsw.mtx.Lock() - eventCell := evsw.eventCells[event] - if eventCell == nil { - eventCell = newEventCell() - evsw.eventCells[event] = eventCell - } - listener := evsw.listeners[listenerID] - if listener == nil { - listener = newEventListener(listenerID) - evsw.listeners[listenerID] = listener - } - evsw.mtx.Unlock() - - // Add event and listener - eventCell.AddListener(listenerID, cb) - listener.AddEvent(event) -} - -func (evsw *EventSwitch) RemoveListener(listenerID string) { - // Get and remove listener - evsw.mtx.RLock() - listener := evsw.listeners[listenerID] - delete(evsw.listeners, listenerID) - evsw.mtx.RUnlock() - - if listener == nil { - return - } - - // Remove callback for each event. - listener.SetRemoved() - for _, event := range listener.GetEvents() { - evsw.RemoveListenerForEvent(event, listenerID) - } -} - -func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerID string) { - // Get eventCell - evsw.mtx.Lock() - eventCell := evsw.eventCells[event] - evsw.mtx.Unlock() - - if eventCell == nil { - return - } - - // Remove listenerID from eventCell - numListeners := eventCell.RemoveListener(listenerID) - - // Maybe garbage collect eventCell. - if numListeners == 0 { - // Lock again and double check. - evsw.mtx.Lock() // OUTER LOCK - eventCell.mtx.Lock() // INNER LOCK - if len(eventCell.listeners) == 0 { - delete(evsw.eventCells, event) - } - eventCell.mtx.Unlock() // INNER LOCK - evsw.mtx.Unlock() // OUTER LOCK - } -} - -func (evsw *EventSwitch) FireEvent(event string, data types.EventData) { - // Get the eventCell - evsw.mtx.RLock() - eventCell := evsw.eventCells[event] - evsw.mtx.RUnlock() - - if eventCell == nil { - return - } - - // Fire event for all listeners in eventCell - eventCell.FireEvent(data) -} - -func (evsw *EventSwitch) SubscribeToEvent(receiver, eventID string, chanCap int) chan interface{} { - // listen for new round - ch := make(chan interface{}, chanCap) - evsw.AddListenerForEvent(receiver, eventID, func(data types.EventData) { - // NOTE: in production, evsw callbacks should be nonblocking. - ch <- data - }) - return ch -} - -//----------------------------------------------------------------------------- - -// eventCell handles keeping track of listener callbacks for a given event. -type eventCell struct { - mtx sync.RWMutex - listeners map[string]eventCallback -} - -func newEventCell() *eventCell { - return &eventCell{ - listeners: make(map[string]eventCallback), - } -} - -func (cell *eventCell) AddListener(listenerID string, cb eventCallback) { - cell.mtx.Lock() - cell.listeners[listenerID] = cb - cell.mtx.Unlock() -} - -func (cell *eventCell) RemoveListener(listenerID string) int { - cell.mtx.Lock() - delete(cell.listeners, listenerID) - numListeners := len(cell.listeners) - cell.mtx.Unlock() - return numListeners -} - -func (cell *eventCell) FireEvent(data types.EventData) { - cell.mtx.RLock() - for _, listener := range cell.listeners { - listener(data) - } - cell.mtx.RUnlock() -} - -//----------------------------------------------------------------------------- - -type eventCallback func(data types.EventData) - -type eventListener struct { - id string - - mtx sync.RWMutex - removed bool - events []string -} - -func newEventListener(id string) *eventListener { - return &eventListener{ - id: id, - removed: false, - events: nil, - } -} - -func (evl *eventListener) AddEvent(event string) { - evl.mtx.Lock() - defer evl.mtx.Unlock() - - if evl.removed { - return - } - evl.events = append(evl.events, event) -} - -func (evl *eventListener) GetEvents() []string { - evl.mtx.RLock() - defer evl.mtx.RUnlock() - - events := make([]string, len(evl.events)) - copy(events, evl.events) - return events -} - -func (evl *eventListener) SetRemoved() { - evl.mtx.Lock() - defer evl.mtx.Unlock() - evl.removed = true -} diff --git a/events/log.go b/events/log.go deleted file mode 100644 index 525462294..000000000 --- a/events/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package events - -import ( - "github.com/tendermint/go-logger" -) - -var log = logger.New("module", "events") diff --git a/mempool/reactor.go b/mempool/reactor.go index a9de97266..51be9beab 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -10,7 +10,7 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/events" + "github.com/tendermint/go-events" "github.com/tendermint/tendermint/types" ) diff --git a/node/node.go b/node/node.go index 09dc5886d..124a87732 100644 --- a/node/node.go +++ b/node/node.go @@ -12,16 +12,16 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" + "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" + "github.com/tendermint/go-rpc" + "github.com/tendermint/go-rpc/server" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" - "github.com/tendermint/tendermint/events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/rpc" "github.com/tendermint/tendermint/rpc/core" - "github.com/tendermint/tendermint/rpc/server" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "github.com/tendermint/tmsp/example/golang" @@ -332,10 +332,10 @@ func getProxyApp(addr string, hash []byte) (proxyAppCtx proxy.AppContext) { if err != nil { Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) } - proxyAppCtx := proxy.NewRemoteAppContext(proxyConn, 1024) - - proxyAppCtx.Start() + remoteApp := proxy.NewRemoteAppContext(proxyConn, 1024) + remoteApp.Start() + proxyAppCtx = remoteApp } // Check the hash diff --git a/node/node_test.go b/node/node_test.go index d171bdc17..4374e5142 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -4,30 +4,12 @@ import ( "testing" "time" - . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" _ "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/tendermint/types" - "github.com/tendermint/tmsp/example/golang" - "github.com/tendermint/tmsp/server" ) func TestNodeStartStop(t *testing.T) { - // Start a dummy app - go func() { - _, err := server.StartListener(config.GetString("proxy_app"), example.NewDummyApplication()) - if err != nil { - Exit(err.Error()) - } - }() - // wait for the server - time.Sleep(time.Second * 2) - - // Get PrivValidator - privValidatorFile := config.GetString("priv_validator_file") - privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - // Create & start node n := NewNode(privValidator) l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp")) diff --git a/rpc/client/http_client.go b/rpc/client/http_client.go deleted file mode 100644 index 9ff726294..000000000 --- a/rpc/client/http_client.go +++ /dev/null @@ -1,133 +0,0 @@ -package rpcclient - -import ( - "bytes" - "errors" - "io/ioutil" - "net/http" - "net/url" - "strings" - - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/types" -) - -// JSON rpc takes params as a slice -type ClientJSONRPC struct { - remote string -} - -func NewClientJSONRPC(remote string) *ClientJSONRPC { - return &ClientJSONRPC{remote} -} - -func (c *ClientJSONRPC) Call(method string, params []interface{}) (interface{}, error) { - return CallHTTP_JSONRPC(c.remote, method, params) -} - -// URI takes params as a map -type ClientURI struct { - remote string -} - -func NewClientURI(remote string) *ClientURI { - if !strings.HasSuffix(remote, "/") { - remote = remote + "/" - } - return &ClientURI{remote} -} - -func (c *ClientURI) Call(method string, params map[string]interface{}) (interface{}, error) { - return CallHTTP_URI(c.remote, method, params) -} - -func CallHTTP_JSONRPC(remote string, method string, params []interface{}) (interface{}, error) { - // Make request and get responseBytes - request := rpctypes.RPCRequest{ - JSONRPC: "2.0", - Method: method, - Params: params, - ID: "", - } - requestBytes := wire.JSONBytes(request) - requestBuf := bytes.NewBuffer(requestBytes) - log.Info(Fmt("RPC request to %v: %v", remote, string(requestBytes))) - httpResponse, err := http.Post(remote, "text/json", requestBuf) - if err != nil { - return nil, err - } - defer httpResponse.Body.Close() - responseBytes, err := ioutil.ReadAll(httpResponse.Body) - if err != nil { - return nil, err - } - log.Info(Fmt("RPC response: %v", string(responseBytes))) - return unmarshalResponseBytes(responseBytes) -} - -func CallHTTP_URI(remote string, method string, params map[string]interface{}) (interface{}, error) { - values, err := argsToURLValues(params) - if err != nil { - return nil, err - } - log.Info(Fmt("URI request to %v: %v", remote, values)) - resp, err := http.PostForm(remote+method, values) - if err != nil { - return nil, err - } - defer resp.Body.Close() - responseBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - return unmarshalResponseBytes(responseBytes) -} - -//------------------------------------------------ - -func unmarshalResponseBytes(responseBytes []byte) (interface{}, error) { - // read response - // if rpc/core/types is imported, the result will unmarshal - // into the correct type - var err error - response := &rpctypes.RPCResponse{} - wire.ReadJSON(response, responseBytes, &err) - if err != nil { - return nil, err - } - errorStr := response.Error - if errorStr != "" { - return nil, errors.New(errorStr) - } - return response.Result, err -} - -func argsToURLValues(args map[string]interface{}) (url.Values, error) { - values := make(url.Values) - if len(args) == 0 { - return values, nil - } - err := argsToJson(args) - if err != nil { - return nil, err - } - for key, val := range args { - values.Set(key, val.(string)) - } - return values, nil -} - -func argsToJson(args map[string]interface{}) error { - var n int - var err error - for k, v := range args { - buf := new(bytes.Buffer) - wire.WriteJSON(v, buf, &n, &err) - if err != nil { - return err - } - args[k] = buf.String() - } - return nil -} diff --git a/rpc/client/log.go b/rpc/client/log.go deleted file mode 100644 index 8b33e2f10..000000000 --- a/rpc/client/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package rpcclient - -import ( - "github.com/tendermint/log15" -) - -var log = log15.New("module", "rpcclient") diff --git a/rpc/client/ws_client.go b/rpc/client/ws_client.go deleted file mode 100644 index 8124b63bc..000000000 --- a/rpc/client/ws_client.go +++ /dev/null @@ -1,116 +0,0 @@ -package rpcclient - -import ( - "net/http" - "time" - - "github.com/gorilla/websocket" - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/types" -) - -const ( - wsResultsChannelCapacity = 10 - wsWriteTimeoutSeconds = 10 -) - -type WSClient struct { - QuitService - Address string - *websocket.Conn - ResultsCh chan rpctypes.Result // closes upon WSClient.Stop() -} - -// create a new connection -func NewWSClient(addr string) *WSClient { - wsClient := &WSClient{ - Address: addr, - Conn: nil, - ResultsCh: make(chan rpctypes.Result, wsResultsChannelCapacity), - } - wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient) - return wsClient -} - -func (wsc *WSClient) OnStart() error { - wsc.QuitService.OnStart() - err := wsc.dial() - if err != nil { - return err - } - go wsc.receiveEventsRoutine() - return nil -} - -func (wsc *WSClient) dial() error { - // Dial - dialer := websocket.DefaultDialer - rHeader := http.Header{} - con, _, err := dialer.Dial(wsc.Address, rHeader) - if err != nil { - return err - } - // Set the ping/pong handlers - con.SetPingHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) - return nil - }) - con.SetPongHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - return nil - }) - wsc.Conn = con - return nil -} - -func (wsc *WSClient) OnStop() { - wsc.QuitService.OnStop() - // ResultsCh is closed in receiveEventsRoutine. -} - -func (wsc *WSClient) receiveEventsRoutine() { - for { - _, data, err := wsc.ReadMessage() - if err != nil { - log.Info("WSClient failed to read message", "error", err, "data", string(data)) - wsc.Stop() - break - } else { - var response rpctypes.RPCResponse - wire.ReadJSON(&response, data, &err) - if err != nil { - log.Info("WSClient failed to parse message", "error", err) - wsc.Stop() - break - } - wsc.ResultsCh <- response.Result - } - } - - // Cleanup - close(wsc.ResultsCh) -} - -// subscribe to an event -func (wsc *WSClient) Subscribe(eventid string) error { - err := wsc.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - ID: "", - Method: "subscribe", - Params: []interface{}{eventid}, - }) - return err -} - -// unsubscribe from an event -func (wsc *WSClient) Unsubscribe(eventid string) error { - err := wsc.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - ID: "", - Method: "unsubscribe", - Params: []interface{}{eventid}, - }) - return err -} diff --git a/rpc/core/events.go b/rpc/core/events.go index 83092fd93..7365377d2 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -1,14 +1,14 @@ package core import ( + "github.com/tendermint/go-events" + "github.com/tendermint/go-rpc/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" - "github.com/tendermint/tendermint/rpc/types" - "github.com/tendermint/tendermint/types" ) func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) { log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { + wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, "")) @@ -18,7 +18,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) { log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { + wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, "")) diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 4b7433369..765c03b27 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -1,7 +1,7 @@ package core import ( - rpc "github.com/tendermint/tendermint/rpc/server" + rpc "github.com/tendermint/go-rpc/server" ) // TODO: eliminate redundancy between here and reading code from core/ diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 7ca47240d..ac6737cfa 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -2,9 +2,10 @@ package core_types import ( "github.com/tendermint/go-crypto" + "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" + "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/types" "github.com/tendermint/tendermint/types" ) @@ -67,9 +68,10 @@ type ResultSubscribe struct { type ResultUnsubscribe struct { } +// TODO: something about this type ResultEvent struct { - Event string `json:"event"` - Data types.EventData `json:"data"` + Event string `json:"event"` + Data events.EventData `json:"data"` } //---------------------------------------- diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go deleted file mode 100644 index 923544be1..000000000 --- a/rpc/server/handlers.go +++ /dev/null @@ -1,553 +0,0 @@ -package rpcserver - -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "net/http" - "reflect" - "sort" - "time" - - "github.com/gorilla/websocket" - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/events" - . "github.com/tendermint/tendermint/rpc/types" -) - -func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { - // HTTP endpoints - for funcName, rpcFunc := range funcMap { - mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc)) - } - - // JSONRPC endpoints - mux.HandleFunc("/", makeJSONRPCHandler(funcMap)) -} - -//------------------------------------- -// function introspection - -// holds all type information for each function -type RPCFunc struct { - f reflect.Value // underlying rpc function - args []reflect.Type // type of each function arg - returns []reflect.Type // type of each return arg - argNames []string // name of each argument - ws bool // websocket only -} - -// wraps a function for quicker introspection -func NewRPCFunc(f interface{}, args []string) *RPCFunc { - return &RPCFunc{ - f: reflect.ValueOf(f), - args: funcArgTypes(f), - returns: funcReturnTypes(f), - argNames: args, - ws: false, - } -} - -func NewWSRPCFunc(f interface{}, args []string) *RPCFunc { - return &RPCFunc{ - f: reflect.ValueOf(f), - args: funcArgTypes(f), - returns: funcReturnTypes(f), - argNames: args, - ws: true, - } -} - -// return a function's argument types -func funcArgTypes(f interface{}) []reflect.Type { - t := reflect.TypeOf(f) - n := t.NumIn() - typez := make([]reflect.Type, n) - for i := 0; i < n; i++ { - typez[i] = t.In(i) - } - return typez -} - -// return a function's return types -func funcReturnTypes(f interface{}) []reflect.Type { - t := reflect.TypeOf(f) - n := t.NumOut() - typez := make([]reflect.Type, n) - for i := 0; i < n; i++ { - typez[i] = t.Out(i) - } - return typez -} - -// function introspection -//----------------------------------------------------------------------------- -// rpc.json - -// jsonrpc calls grab the given method's function info and runs reflect.Call -func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - b, _ := ioutil.ReadAll(r.Body) - // if its an empty request (like from a browser), - // just display a list of functions - if len(b) == 0 { - writeListOfEndpoints(w, r, funcMap) - return - } - - var request RPCRequest - err := json.Unmarshal(b, &request) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) - return - } - if len(r.URL.Path) > 1 { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path))) - return - } - rpcFunc := funcMap[request.Method] - if rpcFunc == nil { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) - return - } - if rpcFunc.ws { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method)) - return - } - args, err := jsonParamsToArgs(rpcFunc, request.Params) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error())) - return - } - returns := rpcFunc.f.Call(args) - log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error())) - return - } - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, "")) - } -} - -// Convert a list of interfaces to properly typed values -func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) { - if len(rpcFunc.argNames) != len(params) { - return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", - len(rpcFunc.argNames), rpcFunc.argNames, len(params), params)) - } - values := make([]reflect.Value, len(params)) - for i, p := range params { - ty := rpcFunc.args[i] - v, err := _jsonObjectToArg(ty, p) - if err != nil { - return nil, err - } - values[i] = v - } - return values, nil -} - -// Same as above, but with the first param the websocket connection -func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) { - if len(rpcFunc.argNames) != len(params) { - return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", - len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params)) - } - values := make([]reflect.Value, len(params)+1) - values[0] = reflect.ValueOf(wsCtx) - for i, p := range params { - ty := rpcFunc.args[i+1] - v, err := _jsonObjectToArg(ty, p) - if err != nil { - return nil, err - } - values[i+1] = v - } - return values, nil -} - -func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) { - var err error - v := reflect.New(ty) - wire.ReadJSONObjectPtr(v.Interface(), object, &err) - if err != nil { - return v, err - } - v = v.Elem() - return v, nil -} - -// rpc.json -//----------------------------------------------------------------------------- -// rpc.http - -// convert from a function name to the http handler -func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) { - // Exception for websocket endpoints - if rpcFunc.ws { - return func(w http.ResponseWriter, r *http.Request) { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, "This RPC method is only for websockets")) - } - } - // All other endpoints - return func(w http.ResponseWriter, r *http.Request) { - args, err := httpParamsToArgs(rpcFunc, r) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) - return - } - returns := rpcFunc.f.Call(args) - log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) - return - } - WriteRPCResponseHTTP(w, NewRPCResponse("", result, "")) - } -} - -// Covert an http query to a list of properly typed values. -// To be properly decoded the arg must be a concrete type from tendermint (if its an interface). -func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) { - argTypes := rpcFunc.args - argNames := rpcFunc.argNames - - var err error - values := make([]reflect.Value, len(argNames)) - for i, name := range argNames { - ty := argTypes[i] - arg := GetParam(r, name) - values[i], err = _jsonStringToArg(ty, arg) - if err != nil { - return nil, err - } - } - return values, nil -} - -func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { - var err error - v := reflect.New(ty) - wire.ReadJSONPtr(v.Interface(), []byte(arg), &err) - if err != nil { - return v, err - } - v = v.Elem() - return v, nil -} - -// rpc.http -//----------------------------------------------------------------------------- -// rpc.websocket - -const ( - writeChanCapacity = 1000 - wsWriteTimeoutSeconds = 30 // each write times out after this - wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings. - wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds. -) - -// a single websocket connection -// contains listener id, underlying ws connection, -// and the event switch for subscribing to events -type wsConnection struct { - QuitService - - remoteAddr string - baseConn *websocket.Conn - writeChan chan RPCResponse - readTimeout *time.Timer - pingTicker *time.Ticker - - funcMap map[string]*RPCFunc - evsw *events.EventSwitch -} - -// new websocket connection wrapper -func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *wsConnection { - wsc := &wsConnection{ - remoteAddr: baseConn.RemoteAddr().String(), - baseConn: baseConn, - writeChan: make(chan RPCResponse, writeChanCapacity), // error when full. - funcMap: funcMap, - evsw: evsw, - } - wsc.QuitService = *NewQuitService(log, "wsConnection", wsc) - return wsc -} - -// wsc.Start() blocks until the connection closes. -func (wsc *wsConnection) OnStart() error { - wsc.QuitService.OnStart() - - // Read subscriptions/unsubscriptions to events - go wsc.readRoutine() - - // Custom Ping handler to touch readTimeout - wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds) - wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds) - wsc.baseConn.SetPingHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) - wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) - return nil - }) - wsc.baseConn.SetPongHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) - return nil - }) - go wsc.readTimeoutRoutine() - - // Write responses, BLOCKING. - wsc.writeRoutine() - return nil -} - -func (wsc *wsConnection) OnStop() { - wsc.QuitService.OnStop() - wsc.evsw.RemoveListener(wsc.remoteAddr) - wsc.readTimeout.Stop() - wsc.pingTicker.Stop() - // The write loop closes the websocket connection - // when it exits its loop, and the read loop - // closes the writeChan -} - -func (wsc *wsConnection) readTimeoutRoutine() { - select { - case <-wsc.readTimeout.C: - log.Notice("Stopping connection due to read timeout") - wsc.Stop() - case <-wsc.Quit: - return - } -} - -// Implements WSRPCConnection -func (wsc *wsConnection) GetRemoteAddr() string { - return wsc.remoteAddr -} - -// Implements WSRPCConnection -func (wsc *wsConnection) GetEventSwitch() *events.EventSwitch { - return wsc.evsw -} - -// Implements WSRPCConnection -// Blocking write to writeChan until service stops. -func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) { - select { - case <-wsc.Quit: - return - case wsc.writeChan <- resp: - } -} - -// Implements WSRPCConnection -// Nonblocking write. -func (wsc *wsConnection) TryWriteRPCResponse(resp RPCResponse) bool { - select { - case <-wsc.Quit: - return false - case wsc.writeChan <- resp: - return true - default: - return false - } -} - -// Read from the socket and subscribe to or unsubscribe from events -func (wsc *wsConnection) readRoutine() { - // Do not close writeChan, to allow WriteRPCResponse() to fail. - // defer close(wsc.writeChan) - - for { - select { - case <-wsc.Quit: - return - default: - var in []byte - // Do not set a deadline here like below: - // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds)) - // The client may not send anything for a while. - // We use `readTimeout` to handle read timeouts. - _, in, err := wsc.baseConn.ReadMessage() - if err != nil { - log.Notice("Failed to read from connection", "remote", wsc.remoteAddr) - // an error reading the connection, - // kill the connection - wsc.Stop() - return - } - var request RPCRequest - err = json.Unmarshal(in, &request) - if err != nil { - errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, errStr)) - continue - } - - // Now, fetch the RPCFunc and execute it. - - rpcFunc := wsc.funcMap[request.Method] - if rpcFunc == nil { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) - continue - } - var args []reflect.Value - if rpcFunc.ws { - wsCtx := WSRPCContext{Request: request, WSRPCConnection: wsc} - args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx) - } else { - args, err = jsonParamsToArgs(rpcFunc, request.Params) - } - if err != nil { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) - continue - } - returns := rpcFunc.f.Call(args) - log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) - continue - } else { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, result, "")) - continue - } - - } - } -} - -// receives on a write channel and writes out on the socket -func (wsc *wsConnection) writeRoutine() { - defer wsc.baseConn.Close() - var n, err = int(0), error(nil) - for { - select { - case <-wsc.Quit: - return - case <-wsc.pingTicker.C: - err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{}) - if err != nil { - log.Error("Failed to write ping message on websocket", "error", err) - wsc.Stop() - return - } - case msg := <-wsc.writeChan: - buf := new(bytes.Buffer) - wire.WriteJSON(msg, buf, &n, &err) - if err != nil { - log.Error("Failed to marshal RPCResponse to JSON", "error", err) - } else { - wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) - bufBytes := buf.Bytes() - if err = wsc.baseConn.WriteMessage(websocket.TextMessage, bufBytes); err != nil { - log.Warn("Failed to write response on websocket", "error", err) - wsc.Stop() - return - } - } - } - } -} - -//---------------------------------------- - -// Main manager for all websocket connections -// Holds the event switch -// NOTE: The websocket path is defined externally, e.g. in node/node.go -type WebsocketManager struct { - websocket.Upgrader - funcMap map[string]*RPCFunc - evsw *events.EventSwitch -} - -func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WebsocketManager { - return &WebsocketManager{ - funcMap: funcMap, - evsw: evsw, - Upgrader: websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - // TODO - return true - }, - }, - } -} - -// Upgrade the request/response (via http.Hijack) and starts the wsConnection. -func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { - wsConn, err := wm.Upgrade(w, r, nil) - if err != nil { - // TODO - return http error - log.Error("Failed to upgrade to websocket connection", "error", err) - return - } - - // register connection - con := NewWSConnection(wsConn, wm.funcMap, wm.evsw) - log.Notice("New websocket connection", "remote", con.remoteAddr) - con.Start() // Blocking -} - -// rpc.websocket -//----------------------------------------------------------------------------- - -// returns is result struct and error. If error is not nil, return it -func unreflectResult(returns []reflect.Value) (interface{}, error) { - errV := returns[1] - if errV.Interface() != nil { - return nil, fmt.Errorf("%v", errV.Interface()) - } - return returns[0].Interface(), nil -} - -// writes a list of available rpc endpoints as an html page -func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) { - noArgNames := []string{} - argNames := []string{} - for name, funcData := range funcMap { - if len(funcData.args) == 0 { - noArgNames = append(noArgNames, name) - } else { - argNames = append(argNames, name) - } - } - sort.Strings(noArgNames) - sort.Strings(argNames) - buf := new(bytes.Buffer) - buf.WriteString("") - buf.WriteString("
Available endpoints:
") - - for _, name := range noArgNames { - link := fmt.Sprintf("http://%s/%s", r.Host, name) - buf.WriteString(fmt.Sprintf("%s
", link, link)) - } - - buf.WriteString("
Endpoints that require arguments:
") - for _, name := range argNames { - link := fmt.Sprintf("http://%s/%s?", r.Host, name) - funcData := funcMap[name] - for i, argName := range funcData.argNames { - link += argName + "=_" - if i < len(funcData.argNames)-1 { - link += "&" - } - } - buf.WriteString(fmt.Sprintf("%s
", link, link)) - } - buf.WriteString("") - w.Header().Set("Content-Type", "text/html") - w.WriteHeader(200) - w.Write(buf.Bytes()) -} diff --git a/rpc/server/http_params.go b/rpc/server/http_params.go deleted file mode 100644 index acf5b4c8c..000000000 --- a/rpc/server/http_params.go +++ /dev/null @@ -1,89 +0,0 @@ -package rpcserver - -import ( - "encoding/hex" - "fmt" - "net/http" - "regexp" - "strconv" -) - -var ( - // Parts of regular expressions - atom = "[A-Z0-9!#$%&'*+\\-/=?^_`{|}~]+" - dotAtom = atom + `(?:\.` + atom + `)*` - domain = `[A-Z0-9.-]+\.[A-Z]{2,4}` - - RE_HEX = regexp.MustCompile(`^(?i)[a-f0-9]+$`) - RE_EMAIL = regexp.MustCompile(`^(?i)(` + dotAtom + `)@(` + dotAtom + `)$`) - RE_ADDRESS = regexp.MustCompile(`^(?i)[a-z0-9]{25,34}$`) - RE_HOST = regexp.MustCompile(`^(?i)(` + domain + `)$`) - - //RE_ID12 = regexp.MustCompile(`^[a-zA-Z0-9]{12}$`) -) - -func GetParam(r *http.Request, param string) string { - s := r.URL.Query().Get(param) - if s == "" { - s = r.FormValue(param) - } - return s -} - -func GetParamByteSlice(r *http.Request, param string) ([]byte, error) { - s := GetParam(r, param) - return hex.DecodeString(s) -} - -func GetParamInt64(r *http.Request, param string) (int64, error) { - s := GetParam(r, param) - i, err := strconv.ParseInt(s, 10, 64) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return i, nil -} - -func GetParamInt32(r *http.Request, param string) (int32, error) { - s := GetParam(r, param) - i, err := strconv.ParseInt(s, 10, 32) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return int32(i), nil -} - -func GetParamUint64(r *http.Request, param string) (uint64, error) { - s := GetParam(r, param) - i, err := strconv.ParseUint(s, 10, 64) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return i, nil -} - -func GetParamUint(r *http.Request, param string) (uint, error) { - s := GetParam(r, param) - i, err := strconv.ParseUint(s, 10, 64) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return uint(i), nil -} - -func GetParamRegexp(r *http.Request, param string, re *regexp.Regexp) (string, error) { - s := GetParam(r, param) - if !re.MatchString(s) { - return "", fmt.Errorf(param, "Did not match regular expression %v", re.String()) - } - return s, nil -} - -func GetParamFloat64(r *http.Request, param string) (float64, error) { - s := GetParam(r, param) - f, err := strconv.ParseFloat(s, 64) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return f, nil -} diff --git a/rpc/server/http_server.go b/rpc/server/http_server.go deleted file mode 100644 index fd35d0df6..000000000 --- a/rpc/server/http_server.go +++ /dev/null @@ -1,115 +0,0 @@ -// Commons for HTTP handling -package rpcserver - -import ( - "bufio" - "fmt" - "net" - "net/http" - "runtime/debug" - "time" - - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/alert" - . "github.com/tendermint/tendermint/rpc/types" -) - -func StartHTTPServer(listenAddr string, handler http.Handler) (net.Listener, error) { - log.Notice(Fmt("Starting RPC HTTP server on %v", listenAddr)) - listener, err := net.Listen("tcp", listenAddr) - if err != nil { - return nil, fmt.Errorf("Failed to listen to %v", listenAddr) - } - go func() { - res := http.Serve( - listener, - RecoverAndLogHandler(handler), - ) - log.Crit("RPC HTTP server stopped", "result", res) - }() - return listener, nil -} - -func WriteRPCResponseHTTP(w http.ResponseWriter, res RPCResponse) { - jsonBytes := wire.JSONBytesPretty(res) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(200) - w.Write(jsonBytes) -} - -//----------------------------------------------------------------------------- - -// Wraps an HTTP handler, adding error logging. -// If the inner function panics, the outer function recovers, logs, sends an -// HTTP 500 error response. -func RecoverAndLogHandler(handler http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Wrap the ResponseWriter to remember the status - rww := &ResponseWriterWrapper{-1, w} - begin := time.Now() - - // Common headers - origin := r.Header.Get("Origin") - rww.Header().Set("Access-Control-Allow-Origin", origin) - rww.Header().Set("Access-Control-Allow-Credentials", "true") - rww.Header().Set("Access-Control-Expose-Headers", "X-Server-Time") - rww.Header().Set("X-Server-Time", fmt.Sprintf("%v", begin.Unix())) - - defer func() { - // Send a 500 error if a panic happens during a handler. - // Without this, Chrome & Firefox were retrying aborted ajax requests, - // at least to my localhost. - if e := recover(); e != nil { - - // If RPCResponse - if res, ok := e.(RPCResponse); ok { - WriteRPCResponseHTTP(rww, res) - } else { - // For the rest, - log.Error("Panic in RPC HTTP handler", "error", e, "stack", string(debug.Stack())) - rww.WriteHeader(http.StatusInternalServerError) - WriteRPCResponseHTTP(rww, NewRPCResponse("", nil, Fmt("Internal Server Error: %v", e))) - } - } - - // Finally, log. - durationMS := time.Since(begin).Nanoseconds() / 1000000 - if rww.Status == -1 { - rww.Status = 200 - } - log.Info("Served RPC HTTP response", - "method", r.Method, "url", r.URL, - "status", rww.Status, "duration", durationMS, - "remoteAddr", r.RemoteAddr, - ) - }() - - handler.ServeHTTP(rww, r) - }) -} - -// Remember the status for logging -type ResponseWriterWrapper struct { - Status int - http.ResponseWriter -} - -func (w *ResponseWriterWrapper) WriteHeader(status int) { - w.Status = status - w.ResponseWriter.WriteHeader(status) -} - -// implements http.Hijacker -func (w *ResponseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) { - return w.ResponseWriter.(http.Hijacker).Hijack() -} - -// Stick it as a deferred statement in gouroutines to prevent the program from crashing. -func Recover(daemonName string) { - if e := recover(); e != nil { - stack := string(debug.Stack()) - errorString := fmt.Sprintf("[%s] %s\n%s", daemonName, e, stack) - alert.Alert(errorString) - } -} diff --git a/rpc/server/log.go b/rpc/server/log.go deleted file mode 100644 index 704e22e30..000000000 --- a/rpc/server/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package rpcserver - -import ( - "github.com/tendermint/log15" -) - -var log = log15.New("module", "rpcserver") diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index a417cbeab..98ff0d96a 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -11,30 +11,48 @@ import ( "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" + client "github.com/tendermint/go-rpc/client" + "github.com/tendermint/go-rpc/types" _ "github.com/tendermint/tendermint/config/tendermint_test" nm "github.com/tendermint/tendermint/node" - client "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/core/types" - "github.com/tendermint/tendermint/rpc/types" "github.com/tendermint/tendermint/types" ) // global variables for use across all tests var ( - rpcAddr = "127.0.0.1:36657" // Not 46657 - requestAddr = "http://" + rpcAddr - websocketAddr = "ws://" + rpcAddr + "/websocket" - node *nm.Node mempoolCount = 0 chainID string - clientURI = client.NewClientURI(requestAddr) - clientJSON = client.NewClientJSONRPC(requestAddr) + rpcAddr, requestAddr, websocketAddr string + + clientURI *client.ClientURI + clientJSON *client.ClientJSONRPC ) +// initialize config and create new node +func init() { + initConfig() + + chainID = config.GetString("chain_id") + rpcAddr = config.GetString("rpc_laddr") + requestAddr = "http://" + rpcAddr + websocketAddr = "ws://" + rpcAddr + "/websocket" + + clientURI = client.NewClientURI(requestAddr) + clientJSON = client.NewClientJSONRPC(requestAddr) + + // TODO: change consensus/state.go timeouts to be shorter + + // start a node + ready := make(chan struct{}) + go newNode(ready) + <-ready +} + // create a new node and sleep forever func newNode(ready chan struct{}) { // Create & start node @@ -52,19 +70,6 @@ func newNode(ready chan struct{}) { <-ch } -// initialize config and create new node -func init() { - initConfig() - chainID = config.GetString("chain_id") - - // TODO: change consensus/state.go timeouts to be shorter - - // start a node - ready := make(chan struct{}) - go newNode(ready) - <-ready -} - //-------------------------------------------------------------------------------- // Utilities for testing the websocket service @@ -192,7 +197,7 @@ func unmarshalResponseNewBlock(b []byte) (*types.Block, error) { func unmarshalValidateBlockchain(t *testing.T, con *websocket.Conn, eid string) { var initBlockN int - for i := 0; i < 2; i++ { + for i := 0; i < 3; i++ { waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { block, err := unmarshalResponseNewBlock(b) if err != nil { diff --git a/rpc/types/types.go b/rpc/types/types.go deleted file mode 100644 index f7bc98d50..000000000 --- a/rpc/types/types.go +++ /dev/null @@ -1,71 +0,0 @@ -package rpctypes - -import ( - "github.com/tendermint/tendermint/events" -) - -type RPCRequest struct { - JSONRPC string `json:"jsonrpc"` - ID string `json:"id"` - Method string `json:"method"` - Params []interface{} `json:"params"` -} - -func NewRPCRequest(id string, method string, params []interface{}) RPCRequest { - return RPCRequest{ - JSONRPC: "2.0", - ID: id, - Method: method, - Params: params, - } -} - -//---------------------------------------- - -/* -Result is a generic interface. -Applications should register type-bytes like so: - -var _ = wire.RegisterInterface( - struct{ Result }{}, - wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis}, - wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo}, - ... -) -*/ -type Result interface { -} - -//---------------------------------------- - -type RPCResponse struct { - JSONRPC string `json:"jsonrpc"` - ID string `json:"id"` - Result Result `json:"result"` - Error string `json:"error"` -} - -func NewRPCResponse(id string, res Result, err string) RPCResponse { - return RPCResponse{ - JSONRPC: "2.0", - ID: id, - Result: res, - Error: err, - } -} - -//---------------------------------------- - -// *wsConnection implements this interface. -type WSRPCConnection interface { - GetRemoteAddr() string - GetEventSwitch() *events.EventSwitch - WriteRPCResponse(resp RPCResponse) - TryWriteRPCResponse(resp RPCResponse) bool -} - -// websocket-only RPCFuncs take this as the first parameter. -type WSRPCContext struct { - Request RPCRequest - WSRPCConnection -} diff --git a/rpc/version.go b/rpc/version.go deleted file mode 100644 index 2982824dd..000000000 --- a/rpc/version.go +++ /dev/null @@ -1,3 +0,0 @@ -package rpc - -const Version = "0.4.0" diff --git a/state/state.go b/state/state.go index 798e8ce72..ff05ebfa4 100644 --- a/state/state.go +++ b/state/state.go @@ -8,6 +8,7 @@ import ( . "github.com/tendermint/go-common" dbm "github.com/tendermint/go-db" + "github.com/tendermint/go-events" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" ) diff --git a/types/events.go b/types/events.go index 436e02de6..0bcd2fcc9 100644 --- a/types/events.go +++ b/types/events.go @@ -1,6 +1,8 @@ package types import ( + // for registering EventData + "github.com/tendermint/go-events" "github.com/tendermint/go-wire" ) @@ -38,12 +40,8 @@ const ( EventDataTypeVote = byte(0x12) ) -type EventData interface { - AssertIsEventData() -} - var _ = wire.RegisterInterface( - struct{ EventData }{}, + struct{ events.EventData }{}, wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock}, // wire.ConcreteType{EventDataFork{}, EventDataTypeFork }, wire.ConcreteType{EventDataTx{}, EventDataTypeTx},