@ -1,64 +0,0 @@ | |||
package alert | |||
import ( | |||
"fmt" | |||
"time" | |||
"github.com/sfreiberg/gotwilio" | |||
) | |||
var lastAlertUnix int64 = 0 | |||
var alertCountSince int = 0 | |||
// Sends a critical alert message to administrators. | |||
func Alert(message string) { | |||
log.Error("<!> ALERT <!>\n" + message) | |||
now := time.Now().Unix() | |||
if now-lastAlertUnix > int64(config.GetInt("alert_min_interval")) { | |||
message = fmt.Sprintf("%v:%v", config.GetString("chain_id"), message) | |||
if alertCountSince > 0 { | |||
message = fmt.Sprintf("%v (+%v more since)", message, alertCountSince) | |||
alertCountSince = 0 | |||
} | |||
if len(config.GetString("alert_twilio_sid")) > 0 { | |||
go sendTwilio(message) | |||
} | |||
if len(config.GetString("alert_email_recipients")) > 0 { | |||
go sendEmail(message) | |||
} | |||
} else { | |||
alertCountSince++ | |||
} | |||
} | |||
func sendTwilio(message string) { | |||
defer func() { | |||
if err := recover(); err != nil { | |||
log.Error("sendTwilio error", "error", err) | |||
} | |||
}() | |||
if len(message) > 50 { | |||
message = message[:50] | |||
} | |||
twilio := gotwilio.NewTwilioClient(config.GetString("alert_twilio_sid"), config.GetString("alert_twilio_token")) | |||
res, exp, err := twilio.SendSMS(config.GetString("alert_twilio_from"), config.GetString("alert_twilio_to"), message, "", "") | |||
if exp != nil || err != nil { | |||
log.Error("sendTwilio error", "res", res, "exp", exp, "error", err) | |||
} | |||
} | |||
func sendEmail(message string) { | |||
defer func() { | |||
if err := recover(); err != nil { | |||
log.Error("sendEmail error", "error", err) | |||
} | |||
}() | |||
subject := message | |||
if len(subject) > 80 { | |||
subject = subject[:80] | |||
} | |||
err := SendEmail(subject, message, config.GetStringSlice("alert_email_recipients")) | |||
if err != nil { | |||
log.Error("sendEmail error", "error", err, "message", message) | |||
} | |||
} |
@ -1,14 +0,0 @@ | |||
package alert | |||
import ( | |||
cfg "github.com/tendermint/go-config" | |||
) | |||
var config cfg.Config = nil | |||
func init() { | |||
cfg.OnConfig(func(newConfig cfg.Config) { | |||
config = newConfig | |||
}) | |||
} |
@ -1,176 +0,0 @@ | |||
// Forked from github.com/SlyMarbo/gmail | |||
package alert | |||
import ( | |||
"bytes" | |||
"crypto/tls" | |||
"encoding/base64" | |||
"errors" | |||
"fmt" | |||
"io/ioutil" | |||
"net/smtp" | |||
"path/filepath" | |||
"regexp" | |||
"strings" | |||
) | |||
// Convenience function | |||
func SendEmail(subject, body string, tos []string) error { | |||
email := Compose(subject, body) | |||
email.From = config.GetString("smtp_user") | |||
email.ContentType = "text/html; charset=utf-8" | |||
email.AddRecipients(tos...) | |||
err := email.Send() | |||
return err | |||
} | |||
// Email represents a single message, which may contain | |||
// attachments. | |||
type Email struct { | |||
From string | |||
To []string | |||
Subject string | |||
ContentType string | |||
Body string | |||
Attachments map[string][]byte | |||
} | |||
// Compose begins a new email, filling the subject and body, | |||
// and allocating memory for the list of recipients and the | |||
// attachments. | |||
func Compose(Subject, Body string) *Email { | |||
out := new(Email) | |||
out.To = make([]string, 0, 1) | |||
out.Subject = Subject | |||
out.Body = Body | |||
out.Attachments = make(map[string][]byte) | |||
return out | |||
} | |||
// Attach takes a filename and adds this to the message. | |||
// Note that since only the filename is stored (and not | |||
// its path, for privacy reasons), multiple files in | |||
// different directories but with the same filename and | |||
// extension cannot be sent. | |||
func (e *Email) Attach(Filename string) error { | |||
b, err := ioutil.ReadFile(Filename) | |||
if err != nil { | |||
return err | |||
} | |||
_, fname := filepath.Split(Filename) | |||
e.Attachments[fname] = b | |||
return nil | |||
} | |||
// AddRecipient adds a single recipient. | |||
func (e *Email) AddRecipient(Recipient string) { | |||
e.To = append(e.To, Recipient) | |||
} | |||
// AddRecipients adds one or more recipients. | |||
func (e *Email) AddRecipients(Recipients ...string) { | |||
e.To = append(e.To, Recipients...) | |||
} | |||
// Send sends the email, returning any error encountered. | |||
func (e *Email) Send() error { | |||
if e.From == "" { | |||
return errors.New("Error: No sender specified. Please set the Email.From field.") | |||
} | |||
if e.To == nil || len(e.To) == 0 { | |||
return errors.New("Error: No recipient specified. Please set the Email.To field.") | |||
} | |||
auth := smtp.PlainAuth( | |||
"", | |||
config.GetString("smtp_user"), | |||
config.GetString("smtp_password"), | |||
config.GetString("smtp_host"), | |||
) | |||
conn, err := smtp.Dial(fmt.Sprintf("%v:%v", config.GetString("smtp_host"), config.GetString("smtp_port"))) | |||
if err != nil { | |||
return err | |||
} | |||
err = conn.StartTLS(&tls.Config{}) | |||
if err != nil { | |||
return err | |||
} | |||
err = conn.Auth(auth) | |||
if err != nil { | |||
return err | |||
} | |||
err = conn.Mail(e.From) | |||
if err != nil { | |||
if strings.Contains(err.Error(), "530 5.5.1") { | |||
return errors.New("Error: Authentication failure. Your username or password is incorrect.") | |||
} | |||
return err | |||
} | |||
for _, recipient := range e.To { | |||
err = conn.Rcpt(recipient) | |||
if err != nil { | |||
return err | |||
} | |||
} | |||
wc, err := conn.Data() | |||
if err != nil { | |||
return err | |||
} | |||
defer wc.Close() | |||
_, err = wc.Write(e.Bytes()) | |||
if err != nil { | |||
return err | |||
} | |||
return nil | |||
} | |||
func (e *Email) Bytes() []byte { | |||
buf := bytes.NewBuffer(nil) | |||
var subject = e.Subject | |||
subject = regexp.MustCompile("\n+").ReplaceAllString(subject, " ") | |||
subject = regexp.MustCompile(" +").ReplaceAllString(subject, " ") | |||
buf.WriteString("Subject: " + subject + "\n") | |||
buf.WriteString("To: <" + strings.Join(e.To, ">,<") + ">\n") | |||
buf.WriteString("MIME-Version: 1.0\n") | |||
// Boundary is used by MIME to separate files. | |||
boundary := "f46d043c813270fc6b04c2d223da" | |||
if len(e.Attachments) > 0 { | |||
buf.WriteString("Content-Type: multipart/mixed; boundary=" + boundary + "\n") | |||
buf.WriteString("--" + boundary + "\n") | |||
} | |||
if e.ContentType == "" { | |||
e.ContentType = "text/plain; charset=utf-8" | |||
} | |||
buf.WriteString(fmt.Sprintf("Content-Type: %s\n\n", e.ContentType)) | |||
buf.WriteString(e.Body) | |||
if len(e.Attachments) > 0 { | |||
for k, v := range e.Attachments { | |||
buf.WriteString("\n\n--" + boundary + "\n") | |||
buf.WriteString("Content-Type: application/octet-stream\n") | |||
buf.WriteString("Content-Transfer-Encoding: base64\n") | |||
buf.WriteString("Content-Disposition: attachment; filename=\"" + k + "\"\n\n") | |||
b := make([]byte, base64.StdEncoding.EncodedLen(len(v))) | |||
base64.StdEncoding.Encode(b, v) | |||
buf.Write(b) | |||
buf.WriteString("\n--" + boundary) | |||
} | |||
buf.WriteString("--") | |||
} | |||
return buf.Bytes() | |||
} |
@ -1,7 +0,0 @@ | |||
package alert | |||
import ( | |||
"github.com/tendermint/go-logger" | |||
) | |||
var log = logger.New("module", "alert") |
@ -1,45 +0,0 @@ | |||
package events | |||
import ( | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
const ( | |||
eventsBufferSize = 1000 | |||
) | |||
// An EventCache buffers events for a Fireable | |||
// All events are cached. Filtering happens on Flush | |||
type EventCache struct { | |||
evsw Fireable | |||
events []eventInfo | |||
} | |||
// Create a new EventCache with an EventSwitch as backend | |||
func NewEventCache(evsw Fireable) *EventCache { | |||
return &EventCache{ | |||
evsw: evsw, | |||
events: make([]eventInfo, eventsBufferSize), | |||
} | |||
} | |||
// a cached event | |||
type eventInfo struct { | |||
event string | |||
data types.EventData | |||
} | |||
// Cache an event to be fired upon finality. | |||
func (evc *EventCache) FireEvent(event string, data types.EventData) { | |||
// append to list | |||
evc.events = append(evc.events, eventInfo{event, data}) | |||
} | |||
// Fire events by running evsw.FireEvent on all cached events. Blocks. | |||
// Clears cached events | |||
func (evc *EventCache) Flush() { | |||
for _, ei := range evc.events { | |||
evc.evsw.FireEvent(ei.event, ei.data) | |||
} | |||
evc.events = make([]eventInfo, eventsBufferSize) | |||
} |
@ -1,215 +0,0 @@ | |||
package events | |||
import ( | |||
"sync" | |||
. "github.com/tendermint/go-common" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// reactors and other modules should export | |||
// this interface to become eventable | |||
type Eventable interface { | |||
SetEventSwitch(evsw *EventSwitch) | |||
} | |||
// an event switch or cache implements fireable | |||
type Fireable interface { | |||
FireEvent(event string, data types.EventData) | |||
} | |||
type EventSwitch struct { | |||
BaseService | |||
mtx sync.RWMutex | |||
eventCells map[string]*eventCell | |||
listeners map[string]*eventListener | |||
} | |||
func NewEventSwitch() *EventSwitch { | |||
evsw := &EventSwitch{} | |||
evsw.BaseService = *NewBaseService(log, "EventSwitch", evsw) | |||
return evsw | |||
} | |||
func (evsw *EventSwitch) OnStart() error { | |||
evsw.BaseService.OnStart() | |||
evsw.eventCells = make(map[string]*eventCell) | |||
evsw.listeners = make(map[string]*eventListener) | |||
return nil | |||
} | |||
func (evsw *EventSwitch) OnStop() { | |||
evsw.BaseService.OnStop() | |||
evsw.eventCells = nil | |||
evsw.listeners = nil | |||
} | |||
func (evsw *EventSwitch) AddListenerForEvent(listenerID, event string, cb eventCallback) { | |||
// Get/Create eventCell and listener | |||
evsw.mtx.Lock() | |||
eventCell := evsw.eventCells[event] | |||
if eventCell == nil { | |||
eventCell = newEventCell() | |||
evsw.eventCells[event] = eventCell | |||
} | |||
listener := evsw.listeners[listenerID] | |||
if listener == nil { | |||
listener = newEventListener(listenerID) | |||
evsw.listeners[listenerID] = listener | |||
} | |||
evsw.mtx.Unlock() | |||
// Add event and listener | |||
eventCell.AddListener(listenerID, cb) | |||
listener.AddEvent(event) | |||
} | |||
func (evsw *EventSwitch) RemoveListener(listenerID string) { | |||
// Get and remove listener | |||
evsw.mtx.RLock() | |||
listener := evsw.listeners[listenerID] | |||
delete(evsw.listeners, listenerID) | |||
evsw.mtx.RUnlock() | |||
if listener == nil { | |||
return | |||
} | |||
// Remove callback for each event. | |||
listener.SetRemoved() | |||
for _, event := range listener.GetEvents() { | |||
evsw.RemoveListenerForEvent(event, listenerID) | |||
} | |||
} | |||
func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerID string) { | |||
// Get eventCell | |||
evsw.mtx.Lock() | |||
eventCell := evsw.eventCells[event] | |||
evsw.mtx.Unlock() | |||
if eventCell == nil { | |||
return | |||
} | |||
// Remove listenerID from eventCell | |||
numListeners := eventCell.RemoveListener(listenerID) | |||
// Maybe garbage collect eventCell. | |||
if numListeners == 0 { | |||
// Lock again and double check. | |||
evsw.mtx.Lock() // OUTER LOCK | |||
eventCell.mtx.Lock() // INNER LOCK | |||
if len(eventCell.listeners) == 0 { | |||
delete(evsw.eventCells, event) | |||
} | |||
eventCell.mtx.Unlock() // INNER LOCK | |||
evsw.mtx.Unlock() // OUTER LOCK | |||
} | |||
} | |||
func (evsw *EventSwitch) FireEvent(event string, data types.EventData) { | |||
// Get the eventCell | |||
evsw.mtx.RLock() | |||
eventCell := evsw.eventCells[event] | |||
evsw.mtx.RUnlock() | |||
if eventCell == nil { | |||
return | |||
} | |||
// Fire event for all listeners in eventCell | |||
eventCell.FireEvent(data) | |||
} | |||
func (evsw *EventSwitch) SubscribeToEvent(receiver, eventID string, chanCap int) chan interface{} { | |||
// listen for new round | |||
ch := make(chan interface{}, chanCap) | |||
evsw.AddListenerForEvent(receiver, eventID, func(data types.EventData) { | |||
// NOTE: in production, evsw callbacks should be nonblocking. | |||
ch <- data | |||
}) | |||
return ch | |||
} | |||
//----------------------------------------------------------------------------- | |||
// eventCell handles keeping track of listener callbacks for a given event. | |||
type eventCell struct { | |||
mtx sync.RWMutex | |||
listeners map[string]eventCallback | |||
} | |||
func newEventCell() *eventCell { | |||
return &eventCell{ | |||
listeners: make(map[string]eventCallback), | |||
} | |||
} | |||
func (cell *eventCell) AddListener(listenerID string, cb eventCallback) { | |||
cell.mtx.Lock() | |||
cell.listeners[listenerID] = cb | |||
cell.mtx.Unlock() | |||
} | |||
func (cell *eventCell) RemoveListener(listenerID string) int { | |||
cell.mtx.Lock() | |||
delete(cell.listeners, listenerID) | |||
numListeners := len(cell.listeners) | |||
cell.mtx.Unlock() | |||
return numListeners | |||
} | |||
func (cell *eventCell) FireEvent(data types.EventData) { | |||
cell.mtx.RLock() | |||
for _, listener := range cell.listeners { | |||
listener(data) | |||
} | |||
cell.mtx.RUnlock() | |||
} | |||
//----------------------------------------------------------------------------- | |||
type eventCallback func(data types.EventData) | |||
type eventListener struct { | |||
id string | |||
mtx sync.RWMutex | |||
removed bool | |||
events []string | |||
} | |||
func newEventListener(id string) *eventListener { | |||
return &eventListener{ | |||
id: id, | |||
removed: false, | |||
events: nil, | |||
} | |||
} | |||
func (evl *eventListener) AddEvent(event string) { | |||
evl.mtx.Lock() | |||
defer evl.mtx.Unlock() | |||
if evl.removed { | |||
return | |||
} | |||
evl.events = append(evl.events, event) | |||
} | |||
func (evl *eventListener) GetEvents() []string { | |||
evl.mtx.RLock() | |||
defer evl.mtx.RUnlock() | |||
events := make([]string, len(evl.events)) | |||
copy(events, evl.events) | |||
return events | |||
} | |||
func (evl *eventListener) SetRemoved() { | |||
evl.mtx.Lock() | |||
defer evl.mtx.Unlock() | |||
evl.removed = true | |||
} |
@ -1,7 +0,0 @@ | |||
package events | |||
import ( | |||
"github.com/tendermint/go-logger" | |||
) | |||
var log = logger.New("module", "events") |
@ -1,51 +0,0 @@ | |||
package rpcclient | |||
import ( | |||
"bytes" | |||
"encoding/json" | |||
"errors" | |||
"io/ioutil" | |||
"net/http" | |||
. "github.com/tendermint/go-common" | |||
"github.com/tendermint/go-wire" | |||
"github.com/tendermint/tendermint/rpc/types" | |||
) | |||
func CallHTTP(remote string, method string, params []interface{}, dest interface{}) (interface{}, error) { | |||
// Make request and get responseBytes | |||
request := rpctypes.RPCRequest{ | |||
JSONRPC: "2.0", | |||
Method: method, | |||
Params: params, | |||
ID: "", | |||
} | |||
requestBytes := wire.JSONBytes(request) | |||
requestBuf := bytes.NewBuffer(requestBytes) | |||
log.Info(Fmt("RPC request to %v: %v", remote, string(requestBytes))) | |||
httpResponse, err := http.Post(remote, "text/json", requestBuf) | |||
if err != nil { | |||
return dest, err | |||
} | |||
defer httpResponse.Body.Close() | |||
responseBytes, err := ioutil.ReadAll(httpResponse.Body) | |||
if err != nil { | |||
return dest, err | |||
} | |||
log.Info(Fmt("RPC response: %v", string(responseBytes))) | |||
// Parse response into JSONResponse | |||
response := rpctypes.RPCResponse{} | |||
err = json.Unmarshal(responseBytes, &response) | |||
if err != nil { | |||
return dest, err | |||
} | |||
// Parse response into dest | |||
resultJSONObject := response.Result | |||
errorStr := response.Error | |||
if errorStr != "" { | |||
return dest, errors.New(errorStr) | |||
} | |||
dest = wire.ReadJSONObject(dest, resultJSONObject, &err) | |||
return dest, err | |||
} |
@ -1,7 +0,0 @@ | |||
package rpcclient | |||
import ( | |||
"github.com/tendermint/log15" | |||
) | |||
var log = log15.New("module", "rpcclient") |
@ -1,116 +0,0 @@ | |||
package rpcclient | |||
import ( | |||
"net/http" | |||
"time" | |||
"github.com/gorilla/websocket" | |||
. "github.com/tendermint/go-common" | |||
"github.com/tendermint/go-wire" | |||
"github.com/tendermint/tendermint/rpc/types" | |||
) | |||
const ( | |||
wsResultsChannelCapacity = 10 | |||
wsWriteTimeoutSeconds = 10 | |||
) | |||
type WSClient struct { | |||
QuitService | |||
Address string | |||
*websocket.Conn | |||
ResultsCh chan rpctypes.Result // closes upon WSClient.Stop() | |||
} | |||
// create a new connection | |||
func NewWSClient(addr string) *WSClient { | |||
wsClient := &WSClient{ | |||
Address: addr, | |||
Conn: nil, | |||
ResultsCh: make(chan rpctypes.Result, wsResultsChannelCapacity), | |||
} | |||
wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient) | |||
return wsClient | |||
} | |||
func (wsc *WSClient) OnStart() error { | |||
wsc.QuitService.OnStart() | |||
err := wsc.dial() | |||
if err != nil { | |||
return err | |||
} | |||
go wsc.receiveEventsRoutine() | |||
return nil | |||
} | |||
func (wsc *WSClient) dial() error { | |||
// Dial | |||
dialer := websocket.DefaultDialer | |||
rHeader := http.Header{} | |||
con, _, err := dialer.Dial(wsc.Address, rHeader) | |||
if err != nil { | |||
return err | |||
} | |||
// Set the ping/pong handlers | |||
con.SetPingHandler(func(m string) error { | |||
// NOTE: https://github.com/gorilla/websocket/issues/97 | |||
go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) | |||
return nil | |||
}) | |||
con.SetPongHandler(func(m string) error { | |||
// NOTE: https://github.com/gorilla/websocket/issues/97 | |||
return nil | |||
}) | |||
wsc.Conn = con | |||
return nil | |||
} | |||
func (wsc *WSClient) OnStop() { | |||
wsc.QuitService.OnStop() | |||
// ResultsCh is closed in receiveEventsRoutine. | |||
} | |||
func (wsc *WSClient) receiveEventsRoutine() { | |||
for { | |||
_, data, err := wsc.ReadMessage() | |||
if err != nil { | |||
log.Info("WSClient failed to read message", "error", err, "data", string(data)) | |||
wsc.Stop() | |||
break | |||
} else { | |||
var response rpctypes.RPCResponse | |||
wire.ReadJSON(&response, data, &err) | |||
if err != nil { | |||
log.Info("WSClient failed to parse message", "error", err) | |||
wsc.Stop() | |||
break | |||
} | |||
wsc.ResultsCh <- response.Result | |||
} | |||
} | |||
// Cleanup | |||
close(wsc.ResultsCh) | |||
} | |||
// subscribe to an event | |||
func (wsc *WSClient) Subscribe(eventid string) error { | |||
err := wsc.WriteJSON(rpctypes.RPCRequest{ | |||
JSONRPC: "2.0", | |||
ID: "", | |||
Method: "subscribe", | |||
Params: []interface{}{eventid}, | |||
}) | |||
return err | |||
} | |||
// unsubscribe from an event | |||
func (wsc *WSClient) Unsubscribe(eventid string) error { | |||
err := wsc.WriteJSON(rpctypes.RPCRequest{ | |||
JSONRPC: "2.0", | |||
ID: "", | |||
Method: "unsubscribe", | |||
Params: []interface{}{eventid}, | |||
}) | |||
return err | |||
} |
@ -1,27 +1,25 @@ | |||
package core | |||
import ( | |||
"github.com/tendermint/go-events" | |||
"github.com/tendermint/go-rpc/types" | |||
ctypes "github.com/tendermint/tendermint/rpc/core/types" | |||
"github.com/tendermint/tendermint/rpc/types" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) { | |||
log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) | |||
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { | |||
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { | |||
// NOTE: EventSwitch callbacks must be nonblocking | |||
// NOTE: RPCResponses of subscribed events have id suffix "#event" | |||
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) | |||
tmResult := ctypes.TMResult(&ctypes.ResultEvent{event, types.TMEventData(msg)}) | |||
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &tmResult, "")) | |||
}) | |||
return &ctypes.ResultSubscribe{}, nil | |||
} | |||
func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) { | |||
log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) | |||
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { | |||
// NOTE: EventSwitch callbacks must be nonblocking | |||
// NOTE: RPCResponses of subscribed events have id suffix "#event" | |||
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) | |||
}) | |||
wsCtx.GetEventSwitch().RemoveListener(event) | |||
return &ctypes.ResultUnsubscribe{}, nil | |||
} |
@ -1,21 +1,111 @@ | |||
package core | |||
import ( | |||
rpc "github.com/tendermint/tendermint/rpc/server" | |||
rpc "github.com/tendermint/go-rpc/server" | |||
"github.com/tendermint/go-rpc/types" | |||
ctypes "github.com/tendermint/tendermint/rpc/core/types" | |||
) | |||
// TODO: eliminate redundancy between here and reading code from core/ | |||
var Routes = map[string]*rpc.RPCFunc{ | |||
"subscribe": rpc.NewWSRPCFunc(Subscribe, []string{"event"}), | |||
"unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, []string{"event"}), | |||
"status": rpc.NewRPCFunc(Status, []string{}), | |||
"net_info": rpc.NewRPCFunc(NetInfo, []string{}), | |||
"blockchain": rpc.NewRPCFunc(BlockchainInfo, []string{"minHeight", "maxHeight"}), | |||
"genesis": rpc.NewRPCFunc(Genesis, []string{}), | |||
"get_block": rpc.NewRPCFunc(GetBlock, []string{"height"}), | |||
"list_validators": rpc.NewRPCFunc(ListValidators, []string{}), | |||
"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, []string{}), | |||
"broadcast_tx": rpc.NewRPCFunc(BroadcastTx, []string{"tx"}), | |||
"list_unconfirmed_txs": rpc.NewRPCFunc(ListUnconfirmedTxs, []string{}), | |||
"subscribe": rpc.NewWSRPCFunc(SubscribeResult, "event"), | |||
"unsubscribe": rpc.NewWSRPCFunc(UnsubscribeResult, "event"), | |||
"status": rpc.NewRPCFunc(StatusResult, ""), | |||
"net_info": rpc.NewRPCFunc(NetInfoResult, ""), | |||
"blockchain": rpc.NewRPCFunc(BlockchainInfoResult, "minHeight,maxHeight"), | |||
"genesis": rpc.NewRPCFunc(GenesisResult, ""), | |||
"get_block": rpc.NewRPCFunc(GetBlockResult, "height"), | |||
"list_validators": rpc.NewRPCFunc(ListValidatorsResult, ""), | |||
"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""), | |||
"broadcast_tx": rpc.NewRPCFunc(BroadcastTxResult, "tx"), | |||
"list_unconfirmed_txs": rpc.NewRPCFunc(ListUnconfirmedTxsResult, ""), | |||
// subscribe/unsubscribe are reserved for websocket events. | |||
} | |||
func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { | |||
if r, err := Subscribe(wsCtx, event); err != nil { | |||
return nil, err | |||
} else { | |||
return r, nil | |||
} | |||
} | |||
func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { | |||
if r, err := Unsubscribe(wsCtx, event); err != nil { | |||
return nil, err | |||
} else { | |||
return r, nil | |||
} | |||
} | |||
func StatusResult() (ctypes.TMResult, error) { | |||
if r, err := Status(); err != nil { | |||
return nil, err | |||
} else { | |||
return r, nil | |||
} | |||
} | |||
func NetInfoResult() (ctypes.TMResult, error) { | |||
if r, err := NetInfo(); err != nil { | |||
return nil, err | |||
} else { | |||
return r, nil | |||
} | |||
} | |||
func BlockchainInfoResult(min, max int) (ctypes.TMResult, error) { | |||
if r, err := BlockchainInfo(min, max); err != nil { | |||
return nil, err | |||
} else { | |||
return r, nil | |||
} | |||
} | |||
func GenesisResult() (ctypes.TMResult, error) { | |||
if r, err := Genesis(); err != nil { | |||
return nil, err | |||
} else { | |||
return r, nil | |||
} | |||
} | |||
func GetBlockResult(height int) (ctypes.TMResult, error) { | |||
if r, err := GetBlock(height); err != nil { | |||
return nil, err | |||
} else { | |||
return r, nil | |||
} | |||
} | |||
func ListValidatorsResult() (ctypes.TMResult, error) { | |||
if r, err := ListValidators(); err != nil { | |||
return nil, err | |||
} else { | |||
return r, nil | |||
} | |||
} | |||
func DumpConsensusStateResult() (ctypes.TMResult, error) { | |||
if r, err := DumpConsensusState(); err != nil { | |||
return nil, err | |||
} else { | |||
return r, nil | |||
} | |||
} | |||
func ListUnconfirmedTxsResult() (ctypes.TMResult, error) { | |||
if r, err := ListUnconfirmedTxs(); err != nil { | |||
return nil, err | |||
} else { | |||
return r, nil | |||
} | |||
} | |||
func BroadcastTxResult(tx []byte) (ctypes.TMResult, error) { | |||
if r, err := BroadcastTx(tx); err != nil { | |||
return nil, err | |||
} else { | |||
return r, nil | |||
} | |||
} |
@ -1,553 +0,0 @@ | |||
package rpcserver | |||
import ( | |||
"bytes" | |||
"encoding/json" | |||
"errors" | |||
"fmt" | |||
"io/ioutil" | |||
"net/http" | |||
"reflect" | |||
"sort" | |||
"time" | |||
"github.com/gorilla/websocket" | |||
. "github.com/tendermint/go-common" | |||
"github.com/tendermint/go-wire" | |||
"github.com/tendermint/tendermint/events" | |||
. "github.com/tendermint/tendermint/rpc/types" | |||
) | |||
func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { | |||
// HTTP endpoints | |||
for funcName, rpcFunc := range funcMap { | |||
mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc)) | |||
} | |||
// JSONRPC endpoints | |||
mux.HandleFunc("/", makeJSONRPCHandler(funcMap)) | |||
} | |||
//------------------------------------- | |||
// function introspection | |||
// holds all type information for each function | |||
type RPCFunc struct { | |||
f reflect.Value // underlying rpc function | |||
args []reflect.Type // type of each function arg | |||
returns []reflect.Type // type of each return arg | |||
argNames []string // name of each argument | |||
ws bool // websocket only | |||
} | |||
// wraps a function for quicker introspection | |||
func NewRPCFunc(f interface{}, args []string) *RPCFunc { | |||
return &RPCFunc{ | |||
f: reflect.ValueOf(f), | |||
args: funcArgTypes(f), | |||
returns: funcReturnTypes(f), | |||
argNames: args, | |||
ws: false, | |||
} | |||
} | |||
func NewWSRPCFunc(f interface{}, args []string) *RPCFunc { | |||
return &RPCFunc{ | |||
f: reflect.ValueOf(f), | |||
args: funcArgTypes(f), | |||
returns: funcReturnTypes(f), | |||
argNames: args, | |||
ws: true, | |||
} | |||
} | |||
// return a function's argument types | |||
func funcArgTypes(f interface{}) []reflect.Type { | |||
t := reflect.TypeOf(f) | |||
n := t.NumIn() | |||
typez := make([]reflect.Type, n) | |||
for i := 0; i < n; i++ { | |||
typez[i] = t.In(i) | |||
} | |||
return typez | |||
} | |||
// return a function's return types | |||
func funcReturnTypes(f interface{}) []reflect.Type { | |||
t := reflect.TypeOf(f) | |||
n := t.NumOut() | |||
typez := make([]reflect.Type, n) | |||
for i := 0; i < n; i++ { | |||
typez[i] = t.Out(i) | |||
} | |||
return typez | |||
} | |||
// function introspection | |||
//----------------------------------------------------------------------------- | |||
// rpc.json | |||
// jsonrpc calls grab the given method's function info and runs reflect.Call | |||
func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc { | |||
return func(w http.ResponseWriter, r *http.Request) { | |||
b, _ := ioutil.ReadAll(r.Body) | |||
// if its an empty request (like from a browser), | |||
// just display a list of functions | |||
if len(b) == 0 { | |||
writeListOfEndpoints(w, r, funcMap) | |||
return | |||
} | |||
var request RPCRequest | |||
err := json.Unmarshal(b, &request) | |||
if err != nil { | |||
WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) | |||
return | |||
} | |||
if len(r.URL.Path) > 1 { | |||
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path))) | |||
return | |||
} | |||
rpcFunc := funcMap[request.Method] | |||
if rpcFunc == nil { | |||
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) | |||
return | |||
} | |||
if rpcFunc.ws { | |||
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method)) | |||
return | |||
} | |||
args, err := jsonParamsToArgs(rpcFunc, request.Params) | |||
if err != nil { | |||
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error())) | |||
return | |||
} | |||
returns := rpcFunc.f.Call(args) | |||
log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) | |||
result, err := unreflectResult(returns) | |||
if err != nil { | |||
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error())) | |||
return | |||
} | |||
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, "")) | |||
} | |||
} | |||
// Convert a list of interfaces to properly typed values | |||
func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) { | |||
if len(rpcFunc.argNames) != len(params) { | |||
return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", | |||
len(rpcFunc.argNames), rpcFunc.argNames, len(params), params)) | |||
} | |||
values := make([]reflect.Value, len(params)) | |||
for i, p := range params { | |||
ty := rpcFunc.args[i] | |||
v, err := _jsonObjectToArg(ty, p) | |||
if err != nil { | |||
return nil, err | |||
} | |||
values[i] = v | |||
} | |||
return values, nil | |||
} | |||
// Same as above, but with the first param the websocket connection | |||
func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) { | |||
if len(rpcFunc.argNames)-1 != len(params) { | |||
return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", | |||
len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params)) | |||
} | |||
values := make([]reflect.Value, len(params)+1) | |||
values[0] = reflect.ValueOf(wsCtx) | |||
for i, p := range params { | |||
ty := rpcFunc.args[i+1] | |||
v, err := _jsonObjectToArg(ty, p) | |||
if err != nil { | |||
return nil, err | |||
} | |||
values[i+1] = v | |||
} | |||
return values, nil | |||
} | |||
func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) { | |||
var err error | |||
v := reflect.New(ty) | |||
wire.ReadJSONObjectPtr(v.Interface(), object, &err) | |||
if err != nil { | |||
return v, err | |||
} | |||
v = v.Elem() | |||
return v, nil | |||
} | |||
// rpc.json | |||
//----------------------------------------------------------------------------- | |||
// rpc.http | |||
// convert from a function name to the http handler | |||
func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) { | |||
// Exception for websocket endpoints | |||
if rpcFunc.ws { | |||
return func(w http.ResponseWriter, r *http.Request) { | |||
WriteRPCResponseHTTP(w, NewRPCResponse("", nil, "This RPC method is only for websockets")) | |||
} | |||
} | |||
// All other endpoints | |||
return func(w http.ResponseWriter, r *http.Request) { | |||
args, err := httpParamsToArgs(rpcFunc, r) | |||
if err != nil { | |||
WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) | |||
return | |||
} | |||
returns := rpcFunc.f.Call(args) | |||
log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) | |||
result, err := unreflectResult(returns) | |||
if err != nil { | |||
WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) | |||
return | |||
} | |||
WriteRPCResponseHTTP(w, NewRPCResponse("", result, "")) | |||
} | |||
} | |||
// Covert an http query to a list of properly typed values. | |||
// To be properly decoded the arg must be a concrete type from tendermint (if its an interface). | |||
func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) { | |||
argTypes := rpcFunc.args | |||
argNames := rpcFunc.argNames | |||
var err error | |||
values := make([]reflect.Value, len(argNames)) | |||
for i, name := range argNames { | |||
ty := argTypes[i] | |||
arg := GetParam(r, name) | |||
values[i], err = _jsonStringToArg(ty, arg) | |||
if err != nil { | |||
return nil, err | |||
} | |||
} | |||
return values, nil | |||
} | |||
func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { | |||
var err error | |||
v := reflect.New(ty) | |||
wire.ReadJSONPtr(v.Interface(), []byte(arg), &err) | |||
if err != nil { | |||
return v, err | |||
} | |||
v = v.Elem() | |||
return v, nil | |||
} | |||
// rpc.http | |||
//----------------------------------------------------------------------------- | |||
// rpc.websocket | |||
const ( | |||
writeChanCapacity = 1000 | |||
wsWriteTimeoutSeconds = 30 // each write times out after this | |||
wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings. | |||
wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds. | |||
) | |||
// a single websocket connection | |||
// contains listener id, underlying ws connection, | |||
// and the event switch for subscribing to events | |||
type wsConnection struct { | |||
QuitService | |||
remoteAddr string | |||
baseConn *websocket.Conn | |||
writeChan chan RPCResponse | |||
readTimeout *time.Timer | |||
pingTicker *time.Ticker | |||
funcMap map[string]*RPCFunc | |||
evsw *events.EventSwitch | |||
} | |||
// new websocket connection wrapper | |||
func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *wsConnection { | |||
wsc := &wsConnection{ | |||
remoteAddr: baseConn.RemoteAddr().String(), | |||
baseConn: baseConn, | |||
writeChan: make(chan RPCResponse, writeChanCapacity), // error when full. | |||
funcMap: funcMap, | |||
evsw: evsw, | |||
} | |||
wsc.QuitService = *NewQuitService(log, "wsConnection", wsc) | |||
return wsc | |||
} | |||
// wsc.Start() blocks until the connection closes. | |||
func (wsc *wsConnection) OnStart() error { | |||
wsc.QuitService.OnStart() | |||
// Read subscriptions/unsubscriptions to events | |||
go wsc.readRoutine() | |||
// Custom Ping handler to touch readTimeout | |||
wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds) | |||
wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds) | |||
wsc.baseConn.SetPingHandler(func(m string) error { | |||
// NOTE: https://github.com/gorilla/websocket/issues/97 | |||
go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) | |||
wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) | |||
return nil | |||
}) | |||
wsc.baseConn.SetPongHandler(func(m string) error { | |||
// NOTE: https://github.com/gorilla/websocket/issues/97 | |||
wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) | |||
return nil | |||
}) | |||
go wsc.readTimeoutRoutine() | |||
// Write responses, BLOCKING. | |||
wsc.writeRoutine() | |||
return nil | |||
} | |||
func (wsc *wsConnection) OnStop() { | |||
wsc.QuitService.OnStop() | |||
wsc.evsw.RemoveListener(wsc.remoteAddr) | |||
wsc.readTimeout.Stop() | |||
wsc.pingTicker.Stop() | |||
// The write loop closes the websocket connection | |||
// when it exits its loop, and the read loop | |||
// closes the writeChan | |||
} | |||
func (wsc *wsConnection) readTimeoutRoutine() { | |||
select { | |||
case <-wsc.readTimeout.C: | |||
log.Notice("Stopping connection due to read timeout") | |||
wsc.Stop() | |||
case <-wsc.Quit: | |||
return | |||
} | |||
} | |||
// Implements WSRPCConnection | |||
func (wsc *wsConnection) GetRemoteAddr() string { | |||
return wsc.remoteAddr | |||
} | |||
// Implements WSRPCConnection | |||
func (wsc *wsConnection) GetEventSwitch() *events.EventSwitch { | |||
return wsc.evsw | |||
} | |||
// Implements WSRPCConnection | |||
// Blocking write to writeChan until service stops. | |||
func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) { | |||
select { | |||
case <-wsc.Quit: | |||
return | |||
case wsc.writeChan <- resp: | |||
} | |||
} | |||
// Implements WSRPCConnection | |||
// Nonblocking write. | |||
func (wsc *wsConnection) TryWriteRPCResponse(resp RPCResponse) bool { | |||
select { | |||
case <-wsc.Quit: | |||
return false | |||
case wsc.writeChan <- resp: | |||
return true | |||
default: | |||
return false | |||
} | |||
} | |||
// Read from the socket and subscribe to or unsubscribe from events | |||
func (wsc *wsConnection) readRoutine() { | |||
// Do not close writeChan, to allow WriteRPCResponse() to fail. | |||
// defer close(wsc.writeChan) | |||
for { | |||
select { | |||
case <-wsc.Quit: | |||
return | |||
default: | |||
var in []byte | |||
// Do not set a deadline here like below: | |||
// wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds)) | |||
// The client may not send anything for a while. | |||
// We use `readTimeout` to handle read timeouts. | |||
_, in, err := wsc.baseConn.ReadMessage() | |||
if err != nil { | |||
log.Notice("Failed to read from connection", "remote", wsc.remoteAddr) | |||
// an error reading the connection, | |||
// kill the connection | |||
wsc.Stop() | |||
return | |||
} | |||
var request RPCRequest | |||
err = json.Unmarshal(in, &request) | |||
if err != nil { | |||
errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) | |||
wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, errStr)) | |||
continue | |||
} | |||
// Now, fetch the RPCFunc and execute it. | |||
rpcFunc := wsc.funcMap[request.Method] | |||
if rpcFunc == nil { | |||
wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) | |||
continue | |||
} | |||
var args []reflect.Value | |||
if rpcFunc.ws { | |||
wsCtx := WSRPCContext{Request: request, WSRPCConnection: wsc} | |||
args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx) | |||
} else { | |||
args, err = jsonParamsToArgs(rpcFunc, request.Params) | |||
} | |||
if err != nil { | |||
wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) | |||
continue | |||
} | |||
returns := rpcFunc.f.Call(args) | |||
log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns) | |||
result, err := unreflectResult(returns) | |||
if err != nil { | |||
wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) | |||
continue | |||
} else { | |||
wsc.WriteRPCResponse(NewRPCResponse(request.ID, result, "")) | |||
continue | |||
} | |||
} | |||
} | |||
} | |||
// receives on a write channel and writes out on the socket | |||
func (wsc *wsConnection) writeRoutine() { | |||
defer wsc.baseConn.Close() | |||
var n, err = int(0), error(nil) | |||
for { | |||
select { | |||
case <-wsc.Quit: | |||
return | |||
case <-wsc.pingTicker.C: | |||
err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{}) | |||
if err != nil { | |||
log.Error("Failed to write ping message on websocket", "error", err) | |||
wsc.Stop() | |||
return | |||
} | |||
case msg := <-wsc.writeChan: | |||
buf := new(bytes.Buffer) | |||
wire.WriteJSON(msg, buf, &n, &err) | |||
if err != nil { | |||
log.Error("Failed to marshal RPCResponse to JSON", "error", err) | |||
} else { | |||
wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) | |||
bufBytes := buf.Bytes() | |||
if err = wsc.baseConn.WriteMessage(websocket.TextMessage, bufBytes); err != nil { | |||
log.Warn("Failed to write response on websocket", "error", err) | |||
wsc.Stop() | |||
return | |||
} | |||
} | |||
} | |||
} | |||
} | |||
//---------------------------------------- | |||
// Main manager for all websocket connections | |||
// Holds the event switch | |||
// NOTE: The websocket path is defined externally, e.g. in node/node.go | |||
type WebsocketManager struct { | |||
websocket.Upgrader | |||
funcMap map[string]*RPCFunc | |||
evsw *events.EventSwitch | |||
} | |||
func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WebsocketManager { | |||
return &WebsocketManager{ | |||
funcMap: funcMap, | |||
evsw: evsw, | |||
Upgrader: websocket.Upgrader{ | |||
ReadBufferSize: 1024, | |||
WriteBufferSize: 1024, | |||
CheckOrigin: func(r *http.Request) bool { | |||
// TODO | |||
return true | |||
}, | |||
}, | |||
} | |||
} | |||
// Upgrade the request/response (via http.Hijack) and starts the wsConnection. | |||
func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { | |||
wsConn, err := wm.Upgrade(w, r, nil) | |||
if err != nil { | |||
// TODO - return http error | |||
log.Error("Failed to upgrade to websocket connection", "error", err) | |||
return | |||
} | |||
// register connection | |||
con := NewWSConnection(wsConn, wm.funcMap, wm.evsw) | |||
log.Notice("New websocket connection", "remote", con.remoteAddr) | |||
con.Start() // Blocking | |||
} | |||
// rpc.websocket | |||
//----------------------------------------------------------------------------- | |||
// returns is result struct and error. If error is not nil, return it | |||
func unreflectResult(returns []reflect.Value) (interface{}, error) { | |||
errV := returns[1] | |||
if errV.Interface() != nil { | |||
return nil, fmt.Errorf("%v", errV.Interface()) | |||
} | |||
return returns[0].Interface(), nil | |||
} | |||
// writes a list of available rpc endpoints as an html page | |||
func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) { | |||
noArgNames := []string{} | |||
argNames := []string{} | |||
for name, funcData := range funcMap { | |||
if len(funcData.args) == 0 { | |||
noArgNames = append(noArgNames, name) | |||
} else { | |||
argNames = append(argNames, name) | |||
} | |||
} | |||
sort.Strings(noArgNames) | |||
sort.Strings(argNames) | |||
buf := new(bytes.Buffer) | |||
buf.WriteString("<html><body>") | |||
buf.WriteString("<br>Available endpoints:<br>") | |||
for _, name := range noArgNames { | |||
link := fmt.Sprintf("http://%s/%s", r.Host, name) | |||
buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link)) | |||
} | |||
buf.WriteString("<br>Endpoints that require arguments:<br>") | |||
for _, name := range argNames { | |||
link := fmt.Sprintf("http://%s/%s?", r.Host, name) | |||
funcData := funcMap[name] | |||
for i, argName := range funcData.argNames { | |||
link += argName + "=_" | |||
if i < len(funcData.argNames)-1 { | |||
link += "&" | |||
} | |||
} | |||
buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link)) | |||
} | |||
buf.WriteString("</body></html>") | |||
w.Header().Set("Content-Type", "text/html") | |||
w.WriteHeader(200) | |||
w.Write(buf.Bytes()) | |||
} |
@ -1,89 +0,0 @@ | |||
package rpcserver | |||
import ( | |||
"encoding/hex" | |||
"fmt" | |||
"net/http" | |||
"regexp" | |||
"strconv" | |||
) | |||
var ( | |||
// Parts of regular expressions | |||
atom = "[A-Z0-9!#$%&'*+\\-/=?^_`{|}~]+" | |||
dotAtom = atom + `(?:\.` + atom + `)*` | |||
domain = `[A-Z0-9.-]+\.[A-Z]{2,4}` | |||
RE_HEX = regexp.MustCompile(`^(?i)[a-f0-9]+$`) | |||
RE_EMAIL = regexp.MustCompile(`^(?i)(` + dotAtom + `)@(` + dotAtom + `)$`) | |||
RE_ADDRESS = regexp.MustCompile(`^(?i)[a-z0-9]{25,34}$`) | |||
RE_HOST = regexp.MustCompile(`^(?i)(` + domain + `)$`) | |||
//RE_ID12 = regexp.MustCompile(`^[a-zA-Z0-9]{12}$`) | |||
) | |||
func GetParam(r *http.Request, param string) string { | |||
s := r.URL.Query().Get(param) | |||
if s == "" { | |||
s = r.FormValue(param) | |||
} | |||
return s | |||
} | |||
func GetParamByteSlice(r *http.Request, param string) ([]byte, error) { | |||
s := GetParam(r, param) | |||
return hex.DecodeString(s) | |||
} | |||
func GetParamInt64(r *http.Request, param string) (int64, error) { | |||
s := GetParam(r, param) | |||
i, err := strconv.ParseInt(s, 10, 64) | |||
if err != nil { | |||
return 0, fmt.Errorf(param, err.Error()) | |||
} | |||
return i, nil | |||
} | |||
func GetParamInt32(r *http.Request, param string) (int32, error) { | |||
s := GetParam(r, param) | |||
i, err := strconv.ParseInt(s, 10, 32) | |||
if err != nil { | |||
return 0, fmt.Errorf(param, err.Error()) | |||
} | |||
return int32(i), nil | |||
} | |||
func GetParamUint64(r *http.Request, param string) (uint64, error) { | |||
s := GetParam(r, param) | |||
i, err := strconv.ParseUint(s, 10, 64) | |||
if err != nil { | |||
return 0, fmt.Errorf(param, err.Error()) | |||
} | |||
return i, nil | |||
} | |||
func GetParamUint(r *http.Request, param string) (uint, error) { | |||
s := GetParam(r, param) | |||
i, err := strconv.ParseUint(s, 10, 64) | |||
if err != nil { | |||
return 0, fmt.Errorf(param, err.Error()) | |||
} | |||
return uint(i), nil | |||
} | |||
func GetParamRegexp(r *http.Request, param string, re *regexp.Regexp) (string, error) { | |||
s := GetParam(r, param) | |||
if !re.MatchString(s) { | |||
return "", fmt.Errorf(param, "Did not match regular expression %v", re.String()) | |||
} | |||
return s, nil | |||
} | |||
func GetParamFloat64(r *http.Request, param string) (float64, error) { | |||
s := GetParam(r, param) | |||
f, err := strconv.ParseFloat(s, 64) | |||
if err != nil { | |||
return 0, fmt.Errorf(param, err.Error()) | |||
} | |||
return f, nil | |||
} |
@ -1,115 +0,0 @@ | |||
// Commons for HTTP handling | |||
package rpcserver | |||
import ( | |||
"bufio" | |||
"fmt" | |||
"net" | |||
"net/http" | |||
"runtime/debug" | |||
"time" | |||
. "github.com/tendermint/go-common" | |||
"github.com/tendermint/go-wire" | |||
"github.com/tendermint/tendermint/alert" | |||
. "github.com/tendermint/tendermint/rpc/types" | |||
) | |||
func StartHTTPServer(listenAddr string, handler http.Handler) (net.Listener, error) { | |||
log.Notice(Fmt("Starting RPC HTTP server on %v", listenAddr)) | |||
listener, err := net.Listen("tcp", listenAddr) | |||
if err != nil { | |||
return nil, fmt.Errorf("Failed to listen to %v", listenAddr) | |||
} | |||
go func() { | |||
res := http.Serve( | |||
listener, | |||
RecoverAndLogHandler(handler), | |||
) | |||
log.Crit("RPC HTTP server stopped", "result", res) | |||
}() | |||
return listener, nil | |||
} | |||
func WriteRPCResponseHTTP(w http.ResponseWriter, res RPCResponse) { | |||
jsonBytes := wire.JSONBytesPretty(res) | |||
w.Header().Set("Content-Type", "application/json") | |||
w.WriteHeader(200) | |||
w.Write(jsonBytes) | |||
} | |||
//----------------------------------------------------------------------------- | |||
// Wraps an HTTP handler, adding error logging. | |||
// If the inner function panics, the outer function recovers, logs, sends an | |||
// HTTP 500 error response. | |||
func RecoverAndLogHandler(handler http.Handler) http.Handler { | |||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |||
// Wrap the ResponseWriter to remember the status | |||
rww := &ResponseWriterWrapper{-1, w} | |||
begin := time.Now() | |||
// Common headers | |||
origin := r.Header.Get("Origin") | |||
rww.Header().Set("Access-Control-Allow-Origin", origin) | |||
rww.Header().Set("Access-Control-Allow-Credentials", "true") | |||
rww.Header().Set("Access-Control-Expose-Headers", "X-Server-Time") | |||
rww.Header().Set("X-Server-Time", fmt.Sprintf("%v", begin.Unix())) | |||
defer func() { | |||
// Send a 500 error if a panic happens during a handler. | |||
// Without this, Chrome & Firefox were retrying aborted ajax requests, | |||
// at least to my localhost. | |||
if e := recover(); e != nil { | |||
// If RPCResponse | |||
if res, ok := e.(RPCResponse); ok { | |||
WriteRPCResponseHTTP(rww, res) | |||
} else { | |||
// For the rest, | |||
log.Error("Panic in RPC HTTP handler", "error", e, "stack", string(debug.Stack())) | |||
rww.WriteHeader(http.StatusInternalServerError) | |||
WriteRPCResponseHTTP(rww, NewRPCResponse("", nil, Fmt("Internal Server Error: %v", e))) | |||
} | |||
} | |||
// Finally, log. | |||
durationMS := time.Since(begin).Nanoseconds() / 1000000 | |||
if rww.Status == -1 { | |||
rww.Status = 200 | |||
} | |||
log.Info("Served RPC HTTP response", | |||
"method", r.Method, "url", r.URL, | |||
"status", rww.Status, "duration", durationMS, | |||
"remoteAddr", r.RemoteAddr, | |||
) | |||
}() | |||
handler.ServeHTTP(rww, r) | |||
}) | |||
} | |||
// Remember the status for logging | |||
type ResponseWriterWrapper struct { | |||
Status int | |||
http.ResponseWriter | |||
} | |||
func (w *ResponseWriterWrapper) WriteHeader(status int) { | |||
w.Status = status | |||
w.ResponseWriter.WriteHeader(status) | |||
} | |||
// implements http.Hijacker | |||
func (w *ResponseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) { | |||
return w.ResponseWriter.(http.Hijacker).Hijack() | |||
} | |||
// Stick it as a deferred statement in gouroutines to prevent the program from crashing. | |||
func Recover(daemonName string) { | |||
if e := recover(); e != nil { | |||
stack := string(debug.Stack()) | |||
errorString := fmt.Sprintf("[%s] %s\n%s", daemonName, e, stack) | |||
alert.Alert(errorString) | |||
} | |||
} |
@ -1,7 +0,0 @@ | |||
package rpcserver | |||
import ( | |||
"github.com/tendermint/log15" | |||
) | |||
var log = log15.New("module", "rpcserver") |
@ -0,0 +1,162 @@ | |||
package rpctest | |||
import ( | |||
"fmt" | |||
"testing" | |||
_ "github.com/tendermint/tendermint/config/tendermint_test" | |||
ctypes "github.com/tendermint/tendermint/rpc/core/types" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
//-------------------------------------------------------------------------------- | |||
// Test the HTTP client | |||
func TestURIStatus(t *testing.T) { | |||
tmResult := new(ctypes.TMResult) | |||
_, err := clientURI.Call("status", map[string]interface{}{}, tmResult) | |||
if err != nil { | |||
t.Fatal(err) | |||
} | |||
testStatus(t, tmResult) | |||
} | |||
func TestJSONStatus(t *testing.T) { | |||
tmResult := new(ctypes.TMResult) | |||
_, err := clientJSON.Call("status", []interface{}{}, tmResult) | |||
if err != nil { | |||
t.Fatal(err) | |||
} | |||
testStatus(t, tmResult) | |||
} | |||
func testStatus(t *testing.T, statusI interface{}) { | |||
tmRes := statusI.(*ctypes.TMResult) | |||
status := (*tmRes).(*ctypes.ResultStatus) | |||
if status.NodeInfo.Network != chainID { | |||
t.Fatal(fmt.Errorf("ChainID mismatch: got %s expected %s", | |||
status.NodeInfo.Network, chainID)) | |||
} | |||
} | |||
/*func TestURIBroadcastTx(t *testing.T) { | |||
testBroadcastTx(t, "HTTP") | |||
}*/ | |||
/*func TestJSONBroadcastTx(t *testing.T) { | |||
testBroadcastTx(t, "JSONRPC") | |||
}*/ | |||
// TODO | |||
/* | |||
func testBroadcastTx(t *testing.T, typ string) { | |||
amt := int64(100) | |||
toAddr := user[1].Address | |||
tx := makeDefaultSendTxSigned(t, typ, toAddr, amt) | |||
receipt := broadcastTx(t, typ, tx) | |||
if receipt.CreatesContract > 0 { | |||
t.Fatal("This tx does not create a contract") | |||
} | |||
if len(receipt.TxHash) == 0 { | |||
t.Fatal("Failed to compute tx hash") | |||
} | |||
pool := node.MempoolReactor().Mempool | |||
txs := pool.GetProposalTxs() | |||
if len(txs) != mempoolCount { | |||
t.Fatalf("The mem pool has %d txs. Expected %d", len(txs), mempoolCount) | |||
} | |||
tx2 := txs[mempoolCount-1].(*types.SendTx) | |||
n, err := new(int64), new(error) | |||
buf1, buf2 := new(bytes.Buffer), new(bytes.Buffer) | |||
tx.WriteSignBytes(chainID, buf1, n, err) | |||
tx2.WriteSignBytes(chainID, buf2, n, err) | |||
if bytes.Compare(buf1.Bytes(), buf2.Bytes()) != 0 { | |||
t.Fatal("inconsistent hashes for mempool tx and sent tx") | |||
} | |||
}*/ | |||
//-------------------------------------------------------------------------------- | |||
// Test the websocket service | |||
var wsTyp = "JSONRPC" | |||
// make a simple connection to the server | |||
func TestWSConnect(t *testing.T) { | |||
wsc := newWSClient(t) | |||
wsc.Stop() | |||
} | |||
// receive a new block message | |||
func TestWSNewBlock(t *testing.T) { | |||
wsc := newWSClient(t) | |||
eid := types.EventStringNewBlock() | |||
subscribe(t, wsc, eid) | |||
defer func() { | |||
unsubscribe(t, wsc, eid) | |||
wsc.Stop() | |||
}() | |||
waitForEvent(t, wsc, eid, true, func() {}, func(eid string, b interface{}) error { | |||
fmt.Println("Check:", b) | |||
return nil | |||
}) | |||
} | |||
// receive a few new block messages in a row, with increasing height | |||
func TestWSBlockchainGrowth(t *testing.T) { | |||
if testing.Short() { | |||
t.Skip("skipping test in short mode.") | |||
} | |||
wsc := newWSClient(t) | |||
eid := types.EventStringNewBlock() | |||
subscribe(t, wsc, eid) | |||
defer func() { | |||
unsubscribe(t, wsc, eid) | |||
wsc.Stop() | |||
}() | |||
// listen for NewBlock, ensure height increases by 1 | |||
var initBlockN int | |||
for i := 0; i < 3; i++ { | |||
waitForEvent(t, wsc, eid, true, func() {}, func(eid string, eventData interface{}) error { | |||
block := eventData.(types.EventDataNewBlock).Block | |||
if i == 0 { | |||
initBlockN = block.Header.Height | |||
} else { | |||
if block.Header.Height != initBlockN+i { | |||
return fmt.Errorf("Expected block %d, got block %d", initBlockN+i, block.Header.Height) | |||
} | |||
} | |||
return nil | |||
}) | |||
} | |||
} | |||
/* TODO: this with dummy app.. | |||
func TestWSDoubleFire(t *testing.T) { | |||
if testing.Short() { | |||
t.Skip("skipping test in short mode.") | |||
} | |||
con := newWSCon(t) | |||
eid := types.EventStringAccInput(user[0].Address) | |||
subscribe(t, con, eid) | |||
defer func() { | |||
unsubscribe(t, con, eid) | |||
con.Close() | |||
}() | |||
amt := int64(100) | |||
toAddr := user[1].Address | |||
// broadcast the transaction, wait to hear about it | |||
waitForEvent(t, con, eid, true, func() { | |||
tx := makeDefaultSendTxSigned(t, wsTyp, toAddr, amt) | |||
broadcastTx(t, wsTyp, tx) | |||
}, func(eid string, b []byte) error { | |||
return nil | |||
}) | |||
// but make sure we don't hear about it twice | |||
waitForEvent(t, con, eid, false, func() { | |||
}, func(eid string, b []byte) error { | |||
return nil | |||
}) | |||
}*/ |
@ -0,0 +1,18 @@ | |||
package rpctest | |||
import ( | |||
cfg "github.com/tendermint/tendermint/config" | |||
tmcfg "github.com/tendermint/tendermint/config/tendermint_test" | |||
) | |||
var config cfg.Config = nil | |||
func initConfig() { | |||
cfg.OnConfig(func(newConfig cfg.Config) { | |||
config = newConfig | |||
}) | |||
c := tmcfg.GetConfig("") | |||
cfg.ApplyConfig(c) // Notify modules of new config | |||
} |
@ -0,0 +1,162 @@ | |||
package rpctest | |||
import ( | |||
"testing" | |||
"time" | |||
. "github.com/tendermint/go-common" | |||
"github.com/tendermint/go-p2p" | |||
"github.com/tendermint/go-wire" | |||
client "github.com/tendermint/go-rpc/client" | |||
_ "github.com/tendermint/tendermint/config/tendermint_test" | |||
nm "github.com/tendermint/tendermint/node" | |||
ctypes "github.com/tendermint/tendermint/rpc/core/types" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// global variables for use across all tests | |||
var ( | |||
node *nm.Node | |||
mempoolCount = 0 | |||
chainID string | |||
rpcAddr, requestAddr, websocketAddr string | |||
clientURI *client.ClientURI | |||
clientJSON *client.ClientJSONRPC | |||
) | |||
// initialize config and create new node | |||
func init() { | |||
initConfig() | |||
chainID = config.GetString("chain_id") | |||
rpcAddr = config.GetString("rpc_laddr") | |||
requestAddr = "http://" + rpcAddr | |||
websocketAddr = "ws://" + rpcAddr + "/websocket" | |||
clientURI = client.NewClientURI(requestAddr) | |||
clientJSON = client.NewClientJSONRPC(requestAddr) | |||
// TODO: change consensus/state.go timeouts to be shorter | |||
// start a node | |||
ready := make(chan struct{}) | |||
go newNode(ready) | |||
<-ready | |||
} | |||
// create a new node and sleep forever | |||
func newNode(ready chan struct{}) { | |||
// Create & start node | |||
privValidatorFile := config.GetString("priv_validator_file") | |||
privValidator := types.LoadOrGenPrivValidator(privValidatorFile) | |||
node = nm.NewNode(privValidator) | |||
l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), true) | |||
node.AddListener(l) | |||
node.Start() | |||
// Run the RPC server. | |||
node.StartRPC() | |||
ready <- struct{}{} | |||
// Sleep forever | |||
ch := make(chan struct{}) | |||
<-ch | |||
} | |||
//-------------------------------------------------------------------------------- | |||
// Utilities for testing the websocket service | |||
// create a new connection | |||
func newWSClient(t *testing.T) *client.WSClient { | |||
wsc := client.NewWSClient(websocketAddr) | |||
if _, err := wsc.Start(); err != nil { | |||
t.Fatal(err) | |||
} | |||
return wsc | |||
} | |||
// subscribe to an event | |||
func subscribe(t *testing.T, wsc *client.WSClient, eventid string) { | |||
if err := wsc.Subscribe(eventid); err != nil { | |||
t.Fatal(err) | |||
} | |||
} | |||
// unsubscribe from an event | |||
func unsubscribe(t *testing.T, wsc *client.WSClient, eventid string) { | |||
if err := wsc.Unsubscribe(eventid); err != nil { | |||
t.Fatal(err) | |||
} | |||
} | |||
// wait for an event; do things that might trigger events, and check them when they are received | |||
// the check function takes an event id and the byte slice read off the ws | |||
func waitForEvent(t *testing.T, wsc *client.WSClient, eventid string, dieOnTimeout bool, f func(), check func(string, interface{}) error) { | |||
// go routine to wait for webscoket msg | |||
goodCh := make(chan interface{}) | |||
errCh := make(chan error) | |||
// Read message | |||
go func() { | |||
var err error | |||
LOOP: | |||
for { | |||
select { | |||
case r := <-wsc.ResultsCh: | |||
result := new(ctypes.TMResult) | |||
wire.ReadJSONPtr(result, r, &err) | |||
if err != nil { | |||
errCh <- err | |||
break LOOP | |||
} | |||
event, ok := (*result).(*ctypes.ResultEvent) | |||
if ok && event.Name == eventid { | |||
goodCh <- event.Data | |||
break LOOP | |||
} | |||
case err := <-wsc.ErrorsCh: | |||
errCh <- err | |||
break LOOP | |||
case <-wsc.Quit: | |||
break LOOP | |||
} | |||
} | |||
}() | |||
// do stuff (transactions) | |||
f() | |||
// wait for an event or timeout | |||
timeout := time.NewTimer(10 * time.Second) | |||
select { | |||
case <-timeout.C: | |||
if dieOnTimeout { | |||
wsc.Stop() | |||
t.Fatalf("%s event was not received in time", eventid) | |||
} | |||
// else that's great, we didn't hear the event | |||
// and we shouldn't have | |||
case eventData := <-goodCh: | |||
if dieOnTimeout { | |||
// message was received and expected | |||
// run the check | |||
if err := check(eventid, eventData); err != nil { | |||
t.Fatal(err) // Show the stack trace. | |||
} | |||
} else { | |||
wsc.Stop() | |||
t.Fatalf("%s event was not expected", eventid) | |||
} | |||
case err := <-errCh: | |||
t.Fatal(err) | |||
panic(err) // Show the stack trace. | |||
} | |||
} | |||
//-------------------------------------------------------------------------------- |
@ -1,71 +0,0 @@ | |||
package rpctypes | |||
import ( | |||
"github.com/tendermint/tendermint/events" | |||
) | |||
type RPCRequest struct { | |||
JSONRPC string `json:"jsonrpc"` | |||
ID string `json:"id"` | |||
Method string `json:"method"` | |||
Params []interface{} `json:"params"` | |||
} | |||
func NewRPCRequest(id string, method string, params []interface{}) RPCRequest { | |||
return RPCRequest{ | |||
JSONRPC: "2.0", | |||
ID: id, | |||
Method: method, | |||
Params: params, | |||
} | |||
} | |||
//---------------------------------------- | |||
/* | |||
Result is a generic interface. | |||
Applications should register type-bytes like so: | |||
var _ = wire.RegisterInterface( | |||
struct{ Result }{}, | |||
wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis}, | |||
wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo}, | |||
... | |||
) | |||
*/ | |||
type Result interface { | |||
} | |||
//---------------------------------------- | |||
type RPCResponse struct { | |||
JSONRPC string `json:"jsonrpc"` | |||
ID string `json:"id"` | |||
Result Result `json:"result"` | |||
Error string `json:"error"` | |||
} | |||
func NewRPCResponse(id string, res Result, err string) RPCResponse { | |||
return RPCResponse{ | |||
JSONRPC: "2.0", | |||
ID: id, | |||
Result: res, | |||
Error: err, | |||
} | |||
} | |||
//---------------------------------------- | |||
// *wsConnection implements this interface. | |||
type WSRPCConnection interface { | |||
GetRemoteAddr() string | |||
GetEventSwitch() *events.EventSwitch | |||
WriteRPCResponse(resp RPCResponse) | |||
TryWriteRPCResponse(resp RPCResponse) bool | |||
} | |||
// websocket-only RPCFuncs take this as the first parameter. | |||
type WSRPCContext struct { | |||
Request RPCRequest | |||
WSRPCConnection | |||
} |
@ -1,3 +0,0 @@ | |||
package rpc | |||
const Version = "0.4.0" |