diff --git a/alert/alert.go b/alert/alert.go deleted file mode 100644 index c47633091..000000000 --- a/alert/alert.go +++ /dev/null @@ -1,64 +0,0 @@ -package alert - -import ( - "fmt" - "time" - - "github.com/sfreiberg/gotwilio" -) - -var lastAlertUnix int64 = 0 -var alertCountSince int = 0 - -// Sends a critical alert message to administrators. -func Alert(message string) { - log.Error(" ALERT \n" + message) - now := time.Now().Unix() - if now-lastAlertUnix > int64(config.GetInt("alert_min_interval")) { - message = fmt.Sprintf("%v:%v", config.GetString("chain_id"), message) - if alertCountSince > 0 { - message = fmt.Sprintf("%v (+%v more since)", message, alertCountSince) - alertCountSince = 0 - } - if len(config.GetString("alert_twilio_sid")) > 0 { - go sendTwilio(message) - } - if len(config.GetString("alert_email_recipients")) > 0 { - go sendEmail(message) - } - } else { - alertCountSince++ - } -} - -func sendTwilio(message string) { - defer func() { - if err := recover(); err != nil { - log.Error("sendTwilio error", "error", err) - } - }() - if len(message) > 50 { - message = message[:50] - } - twilio := gotwilio.NewTwilioClient(config.GetString("alert_twilio_sid"), config.GetString("alert_twilio_token")) - res, exp, err := twilio.SendSMS(config.GetString("alert_twilio_from"), config.GetString("alert_twilio_to"), message, "", "") - if exp != nil || err != nil { - log.Error("sendTwilio error", "res", res, "exp", exp, "error", err) - } -} - -func sendEmail(message string) { - defer func() { - if err := recover(); err != nil { - log.Error("sendEmail error", "error", err) - } - }() - subject := message - if len(subject) > 80 { - subject = subject[:80] - } - err := SendEmail(subject, message, config.GetStringSlice("alert_email_recipients")) - if err != nil { - log.Error("sendEmail error", "error", err, "message", message) - } -} diff --git a/alert/config.go b/alert/config.go deleted file mode 100644 index 4a7bf7714..000000000 --- a/alert/config.go +++ /dev/null @@ -1,14 +0,0 @@ - -package alert - -import ( - cfg "github.com/tendermint/go-config" -) - -var config cfg.Config = nil - -func init() { - cfg.OnConfig(func(newConfig cfg.Config) { - config = newConfig - }) -} diff --git a/alert/email.go b/alert/email.go deleted file mode 100644 index c183f1d58..000000000 --- a/alert/email.go +++ /dev/null @@ -1,176 +0,0 @@ -// Forked from github.com/SlyMarbo/gmail -package alert - -import ( - "bytes" - "crypto/tls" - "encoding/base64" - "errors" - "fmt" - "io/ioutil" - "net/smtp" - "path/filepath" - "regexp" - "strings" -) - -// Convenience function -func SendEmail(subject, body string, tos []string) error { - email := Compose(subject, body) - email.From = config.GetString("smtp_user") - email.ContentType = "text/html; charset=utf-8" - email.AddRecipients(tos...) - err := email.Send() - return err -} - -// Email represents a single message, which may contain -// attachments. -type Email struct { - From string - To []string - Subject string - ContentType string - Body string - Attachments map[string][]byte -} - -// Compose begins a new email, filling the subject and body, -// and allocating memory for the list of recipients and the -// attachments. -func Compose(Subject, Body string) *Email { - out := new(Email) - out.To = make([]string, 0, 1) - out.Subject = Subject - out.Body = Body - out.Attachments = make(map[string][]byte) - return out -} - -// Attach takes a filename and adds this to the message. -// Note that since only the filename is stored (and not -// its path, for privacy reasons), multiple files in -// different directories but with the same filename and -// extension cannot be sent. -func (e *Email) Attach(Filename string) error { - b, err := ioutil.ReadFile(Filename) - if err != nil { - return err - } - - _, fname := filepath.Split(Filename) - e.Attachments[fname] = b - return nil -} - -// AddRecipient adds a single recipient. -func (e *Email) AddRecipient(Recipient string) { - e.To = append(e.To, Recipient) -} - -// AddRecipients adds one or more recipients. -func (e *Email) AddRecipients(Recipients ...string) { - e.To = append(e.To, Recipients...) -} - -// Send sends the email, returning any error encountered. -func (e *Email) Send() error { - if e.From == "" { - return errors.New("Error: No sender specified. Please set the Email.From field.") - } - if e.To == nil || len(e.To) == 0 { - return errors.New("Error: No recipient specified. Please set the Email.To field.") - } - - auth := smtp.PlainAuth( - "", - config.GetString("smtp_user"), - config.GetString("smtp_password"), - config.GetString("smtp_host"), - ) - - conn, err := smtp.Dial(fmt.Sprintf("%v:%v", config.GetString("smtp_host"), config.GetString("smtp_port"))) - if err != nil { - return err - } - - err = conn.StartTLS(&tls.Config{}) - if err != nil { - return err - } - - err = conn.Auth(auth) - if err != nil { - return err - } - - err = conn.Mail(e.From) - if err != nil { - if strings.Contains(err.Error(), "530 5.5.1") { - return errors.New("Error: Authentication failure. Your username or password is incorrect.") - } - return err - } - - for _, recipient := range e.To { - err = conn.Rcpt(recipient) - if err != nil { - return err - } - } - - wc, err := conn.Data() - if err != nil { - return err - } - defer wc.Close() - _, err = wc.Write(e.Bytes()) - if err != nil { - return err - } - - return nil -} - -func (e *Email) Bytes() []byte { - buf := bytes.NewBuffer(nil) - - var subject = e.Subject - subject = regexp.MustCompile("\n+").ReplaceAllString(subject, " ") - subject = regexp.MustCompile(" +").ReplaceAllString(subject, " ") - buf.WriteString("Subject: " + subject + "\n") - buf.WriteString("To: <" + strings.Join(e.To, ">,<") + ">\n") - buf.WriteString("MIME-Version: 1.0\n") - - // Boundary is used by MIME to separate files. - boundary := "f46d043c813270fc6b04c2d223da" - - if len(e.Attachments) > 0 { - buf.WriteString("Content-Type: multipart/mixed; boundary=" + boundary + "\n") - buf.WriteString("--" + boundary + "\n") - } - - if e.ContentType == "" { - e.ContentType = "text/plain; charset=utf-8" - } - buf.WriteString(fmt.Sprintf("Content-Type: %s\n\n", e.ContentType)) - buf.WriteString(e.Body) - - if len(e.Attachments) > 0 { - for k, v := range e.Attachments { - buf.WriteString("\n\n--" + boundary + "\n") - buf.WriteString("Content-Type: application/octet-stream\n") - buf.WriteString("Content-Transfer-Encoding: base64\n") - buf.WriteString("Content-Disposition: attachment; filename=\"" + k + "\"\n\n") - - b := make([]byte, base64.StdEncoding.EncodedLen(len(v))) - base64.StdEncoding.Encode(b, v) - buf.Write(b) - buf.WriteString("\n--" + boundary) - } - - buf.WriteString("--") - } - - return buf.Bytes() -} diff --git a/alert/log.go b/alert/log.go deleted file mode 100644 index 58055cf23..000000000 --- a/alert/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package alert - -import ( - "github.com/tendermint/go-logger" -) - -var log = logger.New("module", "alert") diff --git a/benchmarks/simu/counter.go b/benchmarks/simu/counter.go index 307f29d06..44e00a0c9 100644 --- a/benchmarks/simu/counter.go +++ b/benchmarks/simu/counter.go @@ -8,10 +8,10 @@ import ( "github.com/gorilla/websocket" . "github.com/tendermint/go-common" + "github.com/tendermint/go-rpc/client" + "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/client" _ "github.com/tendermint/tendermint/rpc/core/types" // Register RPCResponse > Result types - "github.com/tendermint/tendermint/rpc/types" ) func main() { diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 7bcd1cb88..06168903f 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -10,7 +10,7 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/events" + "github.com/tendermint/go-events" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" diff --git a/consensus/common_test.go b/consensus/common_test.go index 9f7bc6695..7effed689 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -11,7 +11,7 @@ import ( dbm "github.com/tendermint/go-db" bc "github.com/tendermint/tendermint/blockchain" _ "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/tendermint/events" + "github.com/tendermint/go-events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" diff --git a/consensus/reactor.go b/consensus/reactor.go index f0e34cc48..d9f36d929 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -9,10 +9,10 @@ import ( "time" . "github.com/tendermint/go-common" + "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" - "github.com/tendermint/tendermint/events" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -234,12 +234,12 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { // broadcasting the result to peers func (conR *ConsensusReactor) registerEventCallbacks() { - conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) { + conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data events.EventData) { rs := data.(*types.EventDataRoundState).RoundState().(*RoundState) conR.broadcastNewRoundStep(rs) }) - conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { + conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) { edv := data.(*types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote, edv.Index) }) diff --git a/consensus/state.go b/consensus/state.go index dd7ec5550..8cee3dbd6 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -9,9 +9,9 @@ import ( "time" . "github.com/tendermint/go-common" + "github.com/tendermint/go-events" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" - "github.com/tendermint/tendermint/events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" diff --git a/consensus/state_test.go b/consensus/state_test.go index fbe6a3db5..da8d4c903 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -7,7 +7,7 @@ import ( "time" _ "github.com/tendermint/tendermint/config/tendermint_test" - //"github.com/tendermint/tendermint/events" + //"github.com/tendermint/go-events" "github.com/tendermint/tendermint/types" ) diff --git a/events/event_cache.go b/events/event_cache.go deleted file mode 100644 index d0109ae11..000000000 --- a/events/event_cache.go +++ /dev/null @@ -1,45 +0,0 @@ -package events - -import ( - "github.com/tendermint/tendermint/types" -) - -const ( - eventsBufferSize = 1000 -) - -// An EventCache buffers events for a Fireable -// All events are cached. Filtering happens on Flush -type EventCache struct { - evsw Fireable - events []eventInfo -} - -// Create a new EventCache with an EventSwitch as backend -func NewEventCache(evsw Fireable) *EventCache { - return &EventCache{ - evsw: evsw, - events: make([]eventInfo, eventsBufferSize), - } -} - -// a cached event -type eventInfo struct { - event string - data types.EventData -} - -// Cache an event to be fired upon finality. -func (evc *EventCache) FireEvent(event string, data types.EventData) { - // append to list - evc.events = append(evc.events, eventInfo{event, data}) -} - -// Fire events by running evsw.FireEvent on all cached events. Blocks. -// Clears cached events -func (evc *EventCache) Flush() { - for _, ei := range evc.events { - evc.evsw.FireEvent(ei.event, ei.data) - } - evc.events = make([]eventInfo, eventsBufferSize) -} diff --git a/events/events.go b/events/events.go deleted file mode 100644 index 647522d0a..000000000 --- a/events/events.go +++ /dev/null @@ -1,215 +0,0 @@ -package events - -import ( - "sync" - - . "github.com/tendermint/go-common" - "github.com/tendermint/tendermint/types" -) - -// reactors and other modules should export -// this interface to become eventable -type Eventable interface { - SetEventSwitch(evsw *EventSwitch) -} - -// an event switch or cache implements fireable -type Fireable interface { - FireEvent(event string, data types.EventData) -} - -type EventSwitch struct { - BaseService - - mtx sync.RWMutex - eventCells map[string]*eventCell - listeners map[string]*eventListener -} - -func NewEventSwitch() *EventSwitch { - evsw := &EventSwitch{} - evsw.BaseService = *NewBaseService(log, "EventSwitch", evsw) - return evsw -} - -func (evsw *EventSwitch) OnStart() error { - evsw.BaseService.OnStart() - evsw.eventCells = make(map[string]*eventCell) - evsw.listeners = make(map[string]*eventListener) - return nil -} - -func (evsw *EventSwitch) OnStop() { - evsw.BaseService.OnStop() - evsw.eventCells = nil - evsw.listeners = nil -} - -func (evsw *EventSwitch) AddListenerForEvent(listenerID, event string, cb eventCallback) { - // Get/Create eventCell and listener - evsw.mtx.Lock() - eventCell := evsw.eventCells[event] - if eventCell == nil { - eventCell = newEventCell() - evsw.eventCells[event] = eventCell - } - listener := evsw.listeners[listenerID] - if listener == nil { - listener = newEventListener(listenerID) - evsw.listeners[listenerID] = listener - } - evsw.mtx.Unlock() - - // Add event and listener - eventCell.AddListener(listenerID, cb) - listener.AddEvent(event) -} - -func (evsw *EventSwitch) RemoveListener(listenerID string) { - // Get and remove listener - evsw.mtx.RLock() - listener := evsw.listeners[listenerID] - delete(evsw.listeners, listenerID) - evsw.mtx.RUnlock() - - if listener == nil { - return - } - - // Remove callback for each event. - listener.SetRemoved() - for _, event := range listener.GetEvents() { - evsw.RemoveListenerForEvent(event, listenerID) - } -} - -func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerID string) { - // Get eventCell - evsw.mtx.Lock() - eventCell := evsw.eventCells[event] - evsw.mtx.Unlock() - - if eventCell == nil { - return - } - - // Remove listenerID from eventCell - numListeners := eventCell.RemoveListener(listenerID) - - // Maybe garbage collect eventCell. - if numListeners == 0 { - // Lock again and double check. - evsw.mtx.Lock() // OUTER LOCK - eventCell.mtx.Lock() // INNER LOCK - if len(eventCell.listeners) == 0 { - delete(evsw.eventCells, event) - } - eventCell.mtx.Unlock() // INNER LOCK - evsw.mtx.Unlock() // OUTER LOCK - } -} - -func (evsw *EventSwitch) FireEvent(event string, data types.EventData) { - // Get the eventCell - evsw.mtx.RLock() - eventCell := evsw.eventCells[event] - evsw.mtx.RUnlock() - - if eventCell == nil { - return - } - - // Fire event for all listeners in eventCell - eventCell.FireEvent(data) -} - -func (evsw *EventSwitch) SubscribeToEvent(receiver, eventID string, chanCap int) chan interface{} { - // listen for new round - ch := make(chan interface{}, chanCap) - evsw.AddListenerForEvent(receiver, eventID, func(data types.EventData) { - // NOTE: in production, evsw callbacks should be nonblocking. - ch <- data - }) - return ch -} - -//----------------------------------------------------------------------------- - -// eventCell handles keeping track of listener callbacks for a given event. -type eventCell struct { - mtx sync.RWMutex - listeners map[string]eventCallback -} - -func newEventCell() *eventCell { - return &eventCell{ - listeners: make(map[string]eventCallback), - } -} - -func (cell *eventCell) AddListener(listenerID string, cb eventCallback) { - cell.mtx.Lock() - cell.listeners[listenerID] = cb - cell.mtx.Unlock() -} - -func (cell *eventCell) RemoveListener(listenerID string) int { - cell.mtx.Lock() - delete(cell.listeners, listenerID) - numListeners := len(cell.listeners) - cell.mtx.Unlock() - return numListeners -} - -func (cell *eventCell) FireEvent(data types.EventData) { - cell.mtx.RLock() - for _, listener := range cell.listeners { - listener(data) - } - cell.mtx.RUnlock() -} - -//----------------------------------------------------------------------------- - -type eventCallback func(data types.EventData) - -type eventListener struct { - id string - - mtx sync.RWMutex - removed bool - events []string -} - -func newEventListener(id string) *eventListener { - return &eventListener{ - id: id, - removed: false, - events: nil, - } -} - -func (evl *eventListener) AddEvent(event string) { - evl.mtx.Lock() - defer evl.mtx.Unlock() - - if evl.removed { - return - } - evl.events = append(evl.events, event) -} - -func (evl *eventListener) GetEvents() []string { - evl.mtx.RLock() - defer evl.mtx.RUnlock() - - events := make([]string, len(evl.events)) - copy(events, evl.events) - return events -} - -func (evl *eventListener) SetRemoved() { - evl.mtx.Lock() - defer evl.mtx.Unlock() - evl.removed = true -} diff --git a/events/log.go b/events/log.go deleted file mode 100644 index 525462294..000000000 --- a/events/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package events - -import ( - "github.com/tendermint/go-logger" -) - -var log = logger.New("module", "events") diff --git a/mempool/reactor.go b/mempool/reactor.go index a9de97266..51be9beab 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -10,7 +10,7 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/events" + "github.com/tendermint/go-events" "github.com/tendermint/tendermint/types" ) diff --git a/node/node.go b/node/node.go index 09dc5886d..124a87732 100644 --- a/node/node.go +++ b/node/node.go @@ -12,16 +12,16 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" + "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" + "github.com/tendermint/go-rpc" + "github.com/tendermint/go-rpc/server" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" - "github.com/tendermint/tendermint/events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/rpc" "github.com/tendermint/tendermint/rpc/core" - "github.com/tendermint/tendermint/rpc/server" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "github.com/tendermint/tmsp/example/golang" @@ -332,10 +332,10 @@ func getProxyApp(addr string, hash []byte) (proxyAppCtx proxy.AppContext) { if err != nil { Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) } - proxyAppCtx := proxy.NewRemoteAppContext(proxyConn, 1024) - - proxyAppCtx.Start() + remoteApp := proxy.NewRemoteAppContext(proxyConn, 1024) + remoteApp.Start() + proxyAppCtx = remoteApp } // Check the hash diff --git a/node/node_test.go b/node/node_test.go index d171bdc17..4374e5142 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -4,30 +4,12 @@ import ( "testing" "time" - . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" _ "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/tendermint/types" - "github.com/tendermint/tmsp/example/golang" - "github.com/tendermint/tmsp/server" ) func TestNodeStartStop(t *testing.T) { - // Start a dummy app - go func() { - _, err := server.StartListener(config.GetString("proxy_app"), example.NewDummyApplication()) - if err != nil { - Exit(err.Error()) - } - }() - // wait for the server - time.Sleep(time.Second * 2) - - // Get PrivValidator - privValidatorFile := config.GetString("priv_validator_file") - privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - // Create & start node n := NewNode(privValidator) l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp")) diff --git a/rpc/client/http_client.go b/rpc/client/http_client.go deleted file mode 100644 index 9ff726294..000000000 --- a/rpc/client/http_client.go +++ /dev/null @@ -1,133 +0,0 @@ -package rpcclient - -import ( - "bytes" - "errors" - "io/ioutil" - "net/http" - "net/url" - "strings" - - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/types" -) - -// JSON rpc takes params as a slice -type ClientJSONRPC struct { - remote string -} - -func NewClientJSONRPC(remote string) *ClientJSONRPC { - return &ClientJSONRPC{remote} -} - -func (c *ClientJSONRPC) Call(method string, params []interface{}) (interface{}, error) { - return CallHTTP_JSONRPC(c.remote, method, params) -} - -// URI takes params as a map -type ClientURI struct { - remote string -} - -func NewClientURI(remote string) *ClientURI { - if !strings.HasSuffix(remote, "/") { - remote = remote + "/" - } - return &ClientURI{remote} -} - -func (c *ClientURI) Call(method string, params map[string]interface{}) (interface{}, error) { - return CallHTTP_URI(c.remote, method, params) -} - -func CallHTTP_JSONRPC(remote string, method string, params []interface{}) (interface{}, error) { - // Make request and get responseBytes - request := rpctypes.RPCRequest{ - JSONRPC: "2.0", - Method: method, - Params: params, - ID: "", - } - requestBytes := wire.JSONBytes(request) - requestBuf := bytes.NewBuffer(requestBytes) - log.Info(Fmt("RPC request to %v: %v", remote, string(requestBytes))) - httpResponse, err := http.Post(remote, "text/json", requestBuf) - if err != nil { - return nil, err - } - defer httpResponse.Body.Close() - responseBytes, err := ioutil.ReadAll(httpResponse.Body) - if err != nil { - return nil, err - } - log.Info(Fmt("RPC response: %v", string(responseBytes))) - return unmarshalResponseBytes(responseBytes) -} - -func CallHTTP_URI(remote string, method string, params map[string]interface{}) (interface{}, error) { - values, err := argsToURLValues(params) - if err != nil { - return nil, err - } - log.Info(Fmt("URI request to %v: %v", remote, values)) - resp, err := http.PostForm(remote+method, values) - if err != nil { - return nil, err - } - defer resp.Body.Close() - responseBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - return unmarshalResponseBytes(responseBytes) -} - -//------------------------------------------------ - -func unmarshalResponseBytes(responseBytes []byte) (interface{}, error) { - // read response - // if rpc/core/types is imported, the result will unmarshal - // into the correct type - var err error - response := &rpctypes.RPCResponse{} - wire.ReadJSON(response, responseBytes, &err) - if err != nil { - return nil, err - } - errorStr := response.Error - if errorStr != "" { - return nil, errors.New(errorStr) - } - return response.Result, err -} - -func argsToURLValues(args map[string]interface{}) (url.Values, error) { - values := make(url.Values) - if len(args) == 0 { - return values, nil - } - err := argsToJson(args) - if err != nil { - return nil, err - } - for key, val := range args { - values.Set(key, val.(string)) - } - return values, nil -} - -func argsToJson(args map[string]interface{}) error { - var n int - var err error - for k, v := range args { - buf := new(bytes.Buffer) - wire.WriteJSON(v, buf, &n, &err) - if err != nil { - return err - } - args[k] = buf.String() - } - return nil -} diff --git a/rpc/client/log.go b/rpc/client/log.go deleted file mode 100644 index 8b33e2f10..000000000 --- a/rpc/client/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package rpcclient - -import ( - "github.com/tendermint/log15" -) - -var log = log15.New("module", "rpcclient") diff --git a/rpc/client/ws_client.go b/rpc/client/ws_client.go deleted file mode 100644 index 8124b63bc..000000000 --- a/rpc/client/ws_client.go +++ /dev/null @@ -1,116 +0,0 @@ -package rpcclient - -import ( - "net/http" - "time" - - "github.com/gorilla/websocket" - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/types" -) - -const ( - wsResultsChannelCapacity = 10 - wsWriteTimeoutSeconds = 10 -) - -type WSClient struct { - QuitService - Address string - *websocket.Conn - ResultsCh chan rpctypes.Result // closes upon WSClient.Stop() -} - -// create a new connection -func NewWSClient(addr string) *WSClient { - wsClient := &WSClient{ - Address: addr, - Conn: nil, - ResultsCh: make(chan rpctypes.Result, wsResultsChannelCapacity), - } - wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient) - return wsClient -} - -func (wsc *WSClient) OnStart() error { - wsc.QuitService.OnStart() - err := wsc.dial() - if err != nil { - return err - } - go wsc.receiveEventsRoutine() - return nil -} - -func (wsc *WSClient) dial() error { - // Dial - dialer := websocket.DefaultDialer - rHeader := http.Header{} - con, _, err := dialer.Dial(wsc.Address, rHeader) - if err != nil { - return err - } - // Set the ping/pong handlers - con.SetPingHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) - return nil - }) - con.SetPongHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - return nil - }) - wsc.Conn = con - return nil -} - -func (wsc *WSClient) OnStop() { - wsc.QuitService.OnStop() - // ResultsCh is closed in receiveEventsRoutine. -} - -func (wsc *WSClient) receiveEventsRoutine() { - for { - _, data, err := wsc.ReadMessage() - if err != nil { - log.Info("WSClient failed to read message", "error", err, "data", string(data)) - wsc.Stop() - break - } else { - var response rpctypes.RPCResponse - wire.ReadJSON(&response, data, &err) - if err != nil { - log.Info("WSClient failed to parse message", "error", err) - wsc.Stop() - break - } - wsc.ResultsCh <- response.Result - } - } - - // Cleanup - close(wsc.ResultsCh) -} - -// subscribe to an event -func (wsc *WSClient) Subscribe(eventid string) error { - err := wsc.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - ID: "", - Method: "subscribe", - Params: []interface{}{eventid}, - }) - return err -} - -// unsubscribe from an event -func (wsc *WSClient) Unsubscribe(eventid string) error { - err := wsc.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - ID: "", - Method: "unsubscribe", - Params: []interface{}{eventid}, - }) - return err -} diff --git a/rpc/core/events.go b/rpc/core/events.go index 83092fd93..7365377d2 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -1,14 +1,14 @@ package core import ( + "github.com/tendermint/go-events" + "github.com/tendermint/go-rpc/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" - "github.com/tendermint/tendermint/rpc/types" - "github.com/tendermint/tendermint/types" ) func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) { log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { + wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, "")) @@ -18,7 +18,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) { log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { + wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, "")) diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 4b7433369..765c03b27 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -1,7 +1,7 @@ package core import ( - rpc "github.com/tendermint/tendermint/rpc/server" + rpc "github.com/tendermint/go-rpc/server" ) // TODO: eliminate redundancy between here and reading code from core/ diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 7ca47240d..ac6737cfa 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -2,9 +2,10 @@ package core_types import ( "github.com/tendermint/go-crypto" + "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" + "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/types" "github.com/tendermint/tendermint/types" ) @@ -67,9 +68,10 @@ type ResultSubscribe struct { type ResultUnsubscribe struct { } +// TODO: something about this type ResultEvent struct { - Event string `json:"event"` - Data types.EventData `json:"data"` + Event string `json:"event"` + Data events.EventData `json:"data"` } //---------------------------------------- diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go deleted file mode 100644 index 923544be1..000000000 --- a/rpc/server/handlers.go +++ /dev/null @@ -1,553 +0,0 @@ -package rpcserver - -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "net/http" - "reflect" - "sort" - "time" - - "github.com/gorilla/websocket" - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/events" - . "github.com/tendermint/tendermint/rpc/types" -) - -func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { - // HTTP endpoints - for funcName, rpcFunc := range funcMap { - mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc)) - } - - // JSONRPC endpoints - mux.HandleFunc("/", makeJSONRPCHandler(funcMap)) -} - -//------------------------------------- -// function introspection - -// holds all type information for each function -type RPCFunc struct { - f reflect.Value // underlying rpc function - args []reflect.Type // type of each function arg - returns []reflect.Type // type of each return arg - argNames []string // name of each argument - ws bool // websocket only -} - -// wraps a function for quicker introspection -func NewRPCFunc(f interface{}, args []string) *RPCFunc { - return &RPCFunc{ - f: reflect.ValueOf(f), - args: funcArgTypes(f), - returns: funcReturnTypes(f), - argNames: args, - ws: false, - } -} - -func NewWSRPCFunc(f interface{}, args []string) *RPCFunc { - return &RPCFunc{ - f: reflect.ValueOf(f), - args: funcArgTypes(f), - returns: funcReturnTypes(f), - argNames: args, - ws: true, - } -} - -// return a function's argument types -func funcArgTypes(f interface{}) []reflect.Type { - t := reflect.TypeOf(f) - n := t.NumIn() - typez := make([]reflect.Type, n) - for i := 0; i < n; i++ { - typez[i] = t.In(i) - } - return typez -} - -// return a function's return types -func funcReturnTypes(f interface{}) []reflect.Type { - t := reflect.TypeOf(f) - n := t.NumOut() - typez := make([]reflect.Type, n) - for i := 0; i < n; i++ { - typez[i] = t.Out(i) - } - return typez -} - -// function introspection -//----------------------------------------------------------------------------- -// rpc.json - -// jsonrpc calls grab the given method's function info and runs reflect.Call -func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - b, _ := ioutil.ReadAll(r.Body) - // if its an empty request (like from a browser), - // just display a list of functions - if len(b) == 0 { - writeListOfEndpoints(w, r, funcMap) - return - } - - var request RPCRequest - err := json.Unmarshal(b, &request) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) - return - } - if len(r.URL.Path) > 1 { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path))) - return - } - rpcFunc := funcMap[request.Method] - if rpcFunc == nil { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) - return - } - if rpcFunc.ws { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method)) - return - } - args, err := jsonParamsToArgs(rpcFunc, request.Params) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error())) - return - } - returns := rpcFunc.f.Call(args) - log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error())) - return - } - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, "")) - } -} - -// Convert a list of interfaces to properly typed values -func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) { - if len(rpcFunc.argNames) != len(params) { - return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", - len(rpcFunc.argNames), rpcFunc.argNames, len(params), params)) - } - values := make([]reflect.Value, len(params)) - for i, p := range params { - ty := rpcFunc.args[i] - v, err := _jsonObjectToArg(ty, p) - if err != nil { - return nil, err - } - values[i] = v - } - return values, nil -} - -// Same as above, but with the first param the websocket connection -func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) { - if len(rpcFunc.argNames) != len(params) { - return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", - len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params)) - } - values := make([]reflect.Value, len(params)+1) - values[0] = reflect.ValueOf(wsCtx) - for i, p := range params { - ty := rpcFunc.args[i+1] - v, err := _jsonObjectToArg(ty, p) - if err != nil { - return nil, err - } - values[i+1] = v - } - return values, nil -} - -func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) { - var err error - v := reflect.New(ty) - wire.ReadJSONObjectPtr(v.Interface(), object, &err) - if err != nil { - return v, err - } - v = v.Elem() - return v, nil -} - -// rpc.json -//----------------------------------------------------------------------------- -// rpc.http - -// convert from a function name to the http handler -func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) { - // Exception for websocket endpoints - if rpcFunc.ws { - return func(w http.ResponseWriter, r *http.Request) { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, "This RPC method is only for websockets")) - } - } - // All other endpoints - return func(w http.ResponseWriter, r *http.Request) { - args, err := httpParamsToArgs(rpcFunc, r) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) - return - } - returns := rpcFunc.f.Call(args) - log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) - return - } - WriteRPCResponseHTTP(w, NewRPCResponse("", result, "")) - } -} - -// Covert an http query to a list of properly typed values. -// To be properly decoded the arg must be a concrete type from tendermint (if its an interface). -func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) { - argTypes := rpcFunc.args - argNames := rpcFunc.argNames - - var err error - values := make([]reflect.Value, len(argNames)) - for i, name := range argNames { - ty := argTypes[i] - arg := GetParam(r, name) - values[i], err = _jsonStringToArg(ty, arg) - if err != nil { - return nil, err - } - } - return values, nil -} - -func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { - var err error - v := reflect.New(ty) - wire.ReadJSONPtr(v.Interface(), []byte(arg), &err) - if err != nil { - return v, err - } - v = v.Elem() - return v, nil -} - -// rpc.http -//----------------------------------------------------------------------------- -// rpc.websocket - -const ( - writeChanCapacity = 1000 - wsWriteTimeoutSeconds = 30 // each write times out after this - wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings. - wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds. -) - -// a single websocket connection -// contains listener id, underlying ws connection, -// and the event switch for subscribing to events -type wsConnection struct { - QuitService - - remoteAddr string - baseConn *websocket.Conn - writeChan chan RPCResponse - readTimeout *time.Timer - pingTicker *time.Ticker - - funcMap map[string]*RPCFunc - evsw *events.EventSwitch -} - -// new websocket connection wrapper -func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *wsConnection { - wsc := &wsConnection{ - remoteAddr: baseConn.RemoteAddr().String(), - baseConn: baseConn, - writeChan: make(chan RPCResponse, writeChanCapacity), // error when full. - funcMap: funcMap, - evsw: evsw, - } - wsc.QuitService = *NewQuitService(log, "wsConnection", wsc) - return wsc -} - -// wsc.Start() blocks until the connection closes. -func (wsc *wsConnection) OnStart() error { - wsc.QuitService.OnStart() - - // Read subscriptions/unsubscriptions to events - go wsc.readRoutine() - - // Custom Ping handler to touch readTimeout - wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds) - wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds) - wsc.baseConn.SetPingHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) - wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) - return nil - }) - wsc.baseConn.SetPongHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) - return nil - }) - go wsc.readTimeoutRoutine() - - // Write responses, BLOCKING. - wsc.writeRoutine() - return nil -} - -func (wsc *wsConnection) OnStop() { - wsc.QuitService.OnStop() - wsc.evsw.RemoveListener(wsc.remoteAddr) - wsc.readTimeout.Stop() - wsc.pingTicker.Stop() - // The write loop closes the websocket connection - // when it exits its loop, and the read loop - // closes the writeChan -} - -func (wsc *wsConnection) readTimeoutRoutine() { - select { - case <-wsc.readTimeout.C: - log.Notice("Stopping connection due to read timeout") - wsc.Stop() - case <-wsc.Quit: - return - } -} - -// Implements WSRPCConnection -func (wsc *wsConnection) GetRemoteAddr() string { - return wsc.remoteAddr -} - -// Implements WSRPCConnection -func (wsc *wsConnection) GetEventSwitch() *events.EventSwitch { - return wsc.evsw -} - -// Implements WSRPCConnection -// Blocking write to writeChan until service stops. -func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) { - select { - case <-wsc.Quit: - return - case wsc.writeChan <- resp: - } -} - -// Implements WSRPCConnection -// Nonblocking write. -func (wsc *wsConnection) TryWriteRPCResponse(resp RPCResponse) bool { - select { - case <-wsc.Quit: - return false - case wsc.writeChan <- resp: - return true - default: - return false - } -} - -// Read from the socket and subscribe to or unsubscribe from events -func (wsc *wsConnection) readRoutine() { - // Do not close writeChan, to allow WriteRPCResponse() to fail. - // defer close(wsc.writeChan) - - for { - select { - case <-wsc.Quit: - return - default: - var in []byte - // Do not set a deadline here like below: - // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds)) - // The client may not send anything for a while. - // We use `readTimeout` to handle read timeouts. - _, in, err := wsc.baseConn.ReadMessage() - if err != nil { - log.Notice("Failed to read from connection", "remote", wsc.remoteAddr) - // an error reading the connection, - // kill the connection - wsc.Stop() - return - } - var request RPCRequest - err = json.Unmarshal(in, &request) - if err != nil { - errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, errStr)) - continue - } - - // Now, fetch the RPCFunc and execute it. - - rpcFunc := wsc.funcMap[request.Method] - if rpcFunc == nil { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) - continue - } - var args []reflect.Value - if rpcFunc.ws { - wsCtx := WSRPCContext{Request: request, WSRPCConnection: wsc} - args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx) - } else { - args, err = jsonParamsToArgs(rpcFunc, request.Params) - } - if err != nil { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) - continue - } - returns := rpcFunc.f.Call(args) - log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) - continue - } else { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, result, "")) - continue - } - - } - } -} - -// receives on a write channel and writes out on the socket -func (wsc *wsConnection) writeRoutine() { - defer wsc.baseConn.Close() - var n, err = int(0), error(nil) - for { - select { - case <-wsc.Quit: - return - case <-wsc.pingTicker.C: - err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{}) - if err != nil { - log.Error("Failed to write ping message on websocket", "error", err) - wsc.Stop() - return - } - case msg := <-wsc.writeChan: - buf := new(bytes.Buffer) - wire.WriteJSON(msg, buf, &n, &err) - if err != nil { - log.Error("Failed to marshal RPCResponse to JSON", "error", err) - } else { - wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) - bufBytes := buf.Bytes() - if err = wsc.baseConn.WriteMessage(websocket.TextMessage, bufBytes); err != nil { - log.Warn("Failed to write response on websocket", "error", err) - wsc.Stop() - return - } - } - } - } -} - -//---------------------------------------- - -// Main manager for all websocket connections -// Holds the event switch -// NOTE: The websocket path is defined externally, e.g. in node/node.go -type WebsocketManager struct { - websocket.Upgrader - funcMap map[string]*RPCFunc - evsw *events.EventSwitch -} - -func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WebsocketManager { - return &WebsocketManager{ - funcMap: funcMap, - evsw: evsw, - Upgrader: websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - // TODO - return true - }, - }, - } -} - -// Upgrade the request/response (via http.Hijack) and starts the wsConnection. -func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { - wsConn, err := wm.Upgrade(w, r, nil) - if err != nil { - // TODO - return http error - log.Error("Failed to upgrade to websocket connection", "error", err) - return - } - - // register connection - con := NewWSConnection(wsConn, wm.funcMap, wm.evsw) - log.Notice("New websocket connection", "remote", con.remoteAddr) - con.Start() // Blocking -} - -// rpc.websocket -//----------------------------------------------------------------------------- - -// returns is result struct and error. If error is not nil, return it -func unreflectResult(returns []reflect.Value) (interface{}, error) { - errV := returns[1] - if errV.Interface() != nil { - return nil, fmt.Errorf("%v", errV.Interface()) - } - return returns[0].Interface(), nil -} - -// writes a list of available rpc endpoints as an html page -func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) { - noArgNames := []string{} - argNames := []string{} - for name, funcData := range funcMap { - if len(funcData.args) == 0 { - noArgNames = append(noArgNames, name) - } else { - argNames = append(argNames, name) - } - } - sort.Strings(noArgNames) - sort.Strings(argNames) - buf := new(bytes.Buffer) - buf.WriteString("
") - buf.WriteString("