Browse Source

move alert, events, rpc into own repos

pull/178/head
Ethan Buchman 9 years ago
parent
commit
0be13d1d27
31 changed files with 57 additions and 1691 deletions
  1. +0
    -64
      alert/alert.go
  2. +0
    -14
      alert/config.go
  3. +0
    -176
      alert/email.go
  4. +0
    -7
      alert/log.go
  5. +2
    -2
      benchmarks/simu/counter.go
  6. +1
    -1
      blockchain/reactor.go
  7. +1
    -1
      consensus/common_test.go
  8. +3
    -3
      consensus/reactor.go
  9. +1
    -1
      consensus/state.go
  10. +1
    -1
      consensus/state_test.go
  11. +0
    -45
      events/event_cache.go
  12. +0
    -215
      events/events.go
  13. +0
    -7
      events/log.go
  14. +1
    -1
      mempool/reactor.go
  15. +6
    -6
      node/node.go
  16. +0
    -18
      node/node_test.go
  17. +0
    -133
      rpc/client/http_client.go
  18. +0
    -7
      rpc/client/log.go
  19. +0
    -116
      rpc/client/ws_client.go
  20. +4
    -4
      rpc/core/events.go
  21. +1
    -1
      rpc/core/routes.go
  22. +5
    -3
      rpc/core/types/responses.go
  23. +0
    -553
      rpc/server/handlers.go
  24. +0
    -89
      rpc/server/http_params.go
  25. +0
    -115
      rpc/server/http_server.go
  26. +0
    -7
      rpc/server/log.go
  27. +27
    -22
      rpc/test/helpers.go
  28. +0
    -71
      rpc/types/types.go
  29. +0
    -3
      rpc/version.go
  30. +1
    -0
      state/state.go
  31. +3
    -5
      types/events.go

+ 0
- 64
alert/alert.go View File

@ -1,64 +0,0 @@
package alert
import (
"fmt"
"time"
"github.com/sfreiberg/gotwilio"
)
var lastAlertUnix int64 = 0
var alertCountSince int = 0
// Sends a critical alert message to administrators.
func Alert(message string) {
log.Error("<!> ALERT <!>\n" + message)
now := time.Now().Unix()
if now-lastAlertUnix > int64(config.GetInt("alert_min_interval")) {
message = fmt.Sprintf("%v:%v", config.GetString("chain_id"), message)
if alertCountSince > 0 {
message = fmt.Sprintf("%v (+%v more since)", message, alertCountSince)
alertCountSince = 0
}
if len(config.GetString("alert_twilio_sid")) > 0 {
go sendTwilio(message)
}
if len(config.GetString("alert_email_recipients")) > 0 {
go sendEmail(message)
}
} else {
alertCountSince++
}
}
func sendTwilio(message string) {
defer func() {
if err := recover(); err != nil {
log.Error("sendTwilio error", "error", err)
}
}()
if len(message) > 50 {
message = message[:50]
}
twilio := gotwilio.NewTwilioClient(config.GetString("alert_twilio_sid"), config.GetString("alert_twilio_token"))
res, exp, err := twilio.SendSMS(config.GetString("alert_twilio_from"), config.GetString("alert_twilio_to"), message, "", "")
if exp != nil || err != nil {
log.Error("sendTwilio error", "res", res, "exp", exp, "error", err)
}
}
func sendEmail(message string) {
defer func() {
if err := recover(); err != nil {
log.Error("sendEmail error", "error", err)
}
}()
subject := message
if len(subject) > 80 {
subject = subject[:80]
}
err := SendEmail(subject, message, config.GetStringSlice("alert_email_recipients"))
if err != nil {
log.Error("sendEmail error", "error", err, "message", message)
}
}

+ 0
- 14
alert/config.go View File

@ -1,14 +0,0 @@
package alert
import (
cfg "github.com/tendermint/go-config"
)
var config cfg.Config = nil
func init() {
cfg.OnConfig(func(newConfig cfg.Config) {
config = newConfig
})
}

+ 0
- 176
alert/email.go View File

@ -1,176 +0,0 @@
// Forked from github.com/SlyMarbo/gmail
package alert
import (
"bytes"
"crypto/tls"
"encoding/base64"
"errors"
"fmt"
"io/ioutil"
"net/smtp"
"path/filepath"
"regexp"
"strings"
)
// Convenience function
func SendEmail(subject, body string, tos []string) error {
email := Compose(subject, body)
email.From = config.GetString("smtp_user")
email.ContentType = "text/html; charset=utf-8"
email.AddRecipients(tos...)
err := email.Send()
return err
}
// Email represents a single message, which may contain
// attachments.
type Email struct {
From string
To []string
Subject string
ContentType string
Body string
Attachments map[string][]byte
}
// Compose begins a new email, filling the subject and body,
// and allocating memory for the list of recipients and the
// attachments.
func Compose(Subject, Body string) *Email {
out := new(Email)
out.To = make([]string, 0, 1)
out.Subject = Subject
out.Body = Body
out.Attachments = make(map[string][]byte)
return out
}
// Attach takes a filename and adds this to the message.
// Note that since only the filename is stored (and not
// its path, for privacy reasons), multiple files in
// different directories but with the same filename and
// extension cannot be sent.
func (e *Email) Attach(Filename string) error {
b, err := ioutil.ReadFile(Filename)
if err != nil {
return err
}
_, fname := filepath.Split(Filename)
e.Attachments[fname] = b
return nil
}
// AddRecipient adds a single recipient.
func (e *Email) AddRecipient(Recipient string) {
e.To = append(e.To, Recipient)
}
// AddRecipients adds one or more recipients.
func (e *Email) AddRecipients(Recipients ...string) {
e.To = append(e.To, Recipients...)
}
// Send sends the email, returning any error encountered.
func (e *Email) Send() error {
if e.From == "" {
return errors.New("Error: No sender specified. Please set the Email.From field.")
}
if e.To == nil || len(e.To) == 0 {
return errors.New("Error: No recipient specified. Please set the Email.To field.")
}
auth := smtp.PlainAuth(
"",
config.GetString("smtp_user"),
config.GetString("smtp_password"),
config.GetString("smtp_host"),
)
conn, err := smtp.Dial(fmt.Sprintf("%v:%v", config.GetString("smtp_host"), config.GetString("smtp_port")))
if err != nil {
return err
}
err = conn.StartTLS(&tls.Config{})
if err != nil {
return err
}
err = conn.Auth(auth)
if err != nil {
return err
}
err = conn.Mail(e.From)
if err != nil {
if strings.Contains(err.Error(), "530 5.5.1") {
return errors.New("Error: Authentication failure. Your username or password is incorrect.")
}
return err
}
for _, recipient := range e.To {
err = conn.Rcpt(recipient)
if err != nil {
return err
}
}
wc, err := conn.Data()
if err != nil {
return err
}
defer wc.Close()
_, err = wc.Write(e.Bytes())
if err != nil {
return err
}
return nil
}
func (e *Email) Bytes() []byte {
buf := bytes.NewBuffer(nil)
var subject = e.Subject
subject = regexp.MustCompile("\n+").ReplaceAllString(subject, " ")
subject = regexp.MustCompile(" +").ReplaceAllString(subject, " ")
buf.WriteString("Subject: " + subject + "\n")
buf.WriteString("To: <" + strings.Join(e.To, ">,<") + ">\n")
buf.WriteString("MIME-Version: 1.0\n")
// Boundary is used by MIME to separate files.
boundary := "f46d043c813270fc6b04c2d223da"
if len(e.Attachments) > 0 {
buf.WriteString("Content-Type: multipart/mixed; boundary=" + boundary + "\n")
buf.WriteString("--" + boundary + "\n")
}
if e.ContentType == "" {
e.ContentType = "text/plain; charset=utf-8"
}
buf.WriteString(fmt.Sprintf("Content-Type: %s\n\n", e.ContentType))
buf.WriteString(e.Body)
if len(e.Attachments) > 0 {
for k, v := range e.Attachments {
buf.WriteString("\n\n--" + boundary + "\n")
buf.WriteString("Content-Type: application/octet-stream\n")
buf.WriteString("Content-Transfer-Encoding: base64\n")
buf.WriteString("Content-Disposition: attachment; filename=\"" + k + "\"\n\n")
b := make([]byte, base64.StdEncoding.EncodedLen(len(v)))
base64.StdEncoding.Encode(b, v)
buf.Write(b)
buf.WriteString("\n--" + boundary)
}
buf.WriteString("--")
}
return buf.Bytes()
}

+ 0
- 7
alert/log.go View File

@ -1,7 +0,0 @@
package alert
import (
"github.com/tendermint/go-logger"
)
var log = logger.New("module", "alert")

+ 2
- 2
benchmarks/simu/counter.go View File

@ -8,10 +8,10 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-rpc/client"
"github.com/tendermint/go-rpc/types"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/rpc/client"
_ "github.com/tendermint/tendermint/rpc/core/types" // Register RPCResponse > Result types _ "github.com/tendermint/tendermint/rpc/core/types" // Register RPCResponse > Result types
"github.com/tendermint/tendermint/rpc/types"
) )
func main() { func main() {


+ 1
- 1
blockchain/reactor.go View File

@ -10,7 +10,7 @@ import (
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/go-events"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"


+ 1
- 1
consensus/common_test.go View File

@ -11,7 +11,7 @@ import (
dbm "github.com/tendermint/go-db" dbm "github.com/tendermint/go-db"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
_ "github.com/tendermint/tendermint/config/tendermint_test" _ "github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/go-events"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"


+ 3
- 3
consensus/reactor.go View File

@ -9,10 +9,10 @@ import (
"time" "time"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-events"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/events"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -234,12 +234,12 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
// broadcasting the result to peers // broadcasting the result to peers
func (conR *ConsensusReactor) registerEventCallbacks() { func (conR *ConsensusReactor) registerEventCallbacks() {
conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) {
conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data events.EventData) {
rs := data.(*types.EventDataRoundState).RoundState().(*RoundState) rs := data.(*types.EventDataRoundState).RoundState().(*RoundState)
conR.broadcastNewRoundStep(rs) conR.broadcastNewRoundStep(rs)
}) })
conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) {
conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) {
edv := data.(*types.EventDataVote) edv := data.(*types.EventDataVote)
conR.broadcastHasVoteMessage(edv.Vote, edv.Index) conR.broadcastHasVoteMessage(edv.Vote, edv.Index)
}) })


+ 1
- 1
consensus/state.go View File

@ -9,9 +9,9 @@ import (
"time" "time"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-events"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/events"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"


+ 1
- 1
consensus/state_test.go View File

@ -7,7 +7,7 @@ import (
"time" "time"
_ "github.com/tendermint/tendermint/config/tendermint_test" _ "github.com/tendermint/tendermint/config/tendermint_test"
//"github.com/tendermint/tendermint/events"
//"github.com/tendermint/go-events"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )


+ 0
- 45
events/event_cache.go View File

@ -1,45 +0,0 @@
package events
import (
"github.com/tendermint/tendermint/types"
)
const (
eventsBufferSize = 1000
)
// An EventCache buffers events for a Fireable
// All events are cached. Filtering happens on Flush
type EventCache struct {
evsw Fireable
events []eventInfo
}
// Create a new EventCache with an EventSwitch as backend
func NewEventCache(evsw Fireable) *EventCache {
return &EventCache{
evsw: evsw,
events: make([]eventInfo, eventsBufferSize),
}
}
// a cached event
type eventInfo struct {
event string
data types.EventData
}
// Cache an event to be fired upon finality.
func (evc *EventCache) FireEvent(event string, data types.EventData) {
// append to list
evc.events = append(evc.events, eventInfo{event, data})
}
// Fire events by running evsw.FireEvent on all cached events. Blocks.
// Clears cached events
func (evc *EventCache) Flush() {
for _, ei := range evc.events {
evc.evsw.FireEvent(ei.event, ei.data)
}
evc.events = make([]eventInfo, eventsBufferSize)
}

+ 0
- 215
events/events.go View File

@ -1,215 +0,0 @@
package events
import (
"sync"
. "github.com/tendermint/go-common"
"github.com/tendermint/tendermint/types"
)
// reactors and other modules should export
// this interface to become eventable
type Eventable interface {
SetEventSwitch(evsw *EventSwitch)
}
// an event switch or cache implements fireable
type Fireable interface {
FireEvent(event string, data types.EventData)
}
type EventSwitch struct {
BaseService
mtx sync.RWMutex
eventCells map[string]*eventCell
listeners map[string]*eventListener
}
func NewEventSwitch() *EventSwitch {
evsw := &EventSwitch{}
evsw.BaseService = *NewBaseService(log, "EventSwitch", evsw)
return evsw
}
func (evsw *EventSwitch) OnStart() error {
evsw.BaseService.OnStart()
evsw.eventCells = make(map[string]*eventCell)
evsw.listeners = make(map[string]*eventListener)
return nil
}
func (evsw *EventSwitch) OnStop() {
evsw.BaseService.OnStop()
evsw.eventCells = nil
evsw.listeners = nil
}
func (evsw *EventSwitch) AddListenerForEvent(listenerID, event string, cb eventCallback) {
// Get/Create eventCell and listener
evsw.mtx.Lock()
eventCell := evsw.eventCells[event]
if eventCell == nil {
eventCell = newEventCell()
evsw.eventCells[event] = eventCell
}
listener := evsw.listeners[listenerID]
if listener == nil {
listener = newEventListener(listenerID)
evsw.listeners[listenerID] = listener
}
evsw.mtx.Unlock()
// Add event and listener
eventCell.AddListener(listenerID, cb)
listener.AddEvent(event)
}
func (evsw *EventSwitch) RemoveListener(listenerID string) {
// Get and remove listener
evsw.mtx.RLock()
listener := evsw.listeners[listenerID]
delete(evsw.listeners, listenerID)
evsw.mtx.RUnlock()
if listener == nil {
return
}
// Remove callback for each event.
listener.SetRemoved()
for _, event := range listener.GetEvents() {
evsw.RemoveListenerForEvent(event, listenerID)
}
}
func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerID string) {
// Get eventCell
evsw.mtx.Lock()
eventCell := evsw.eventCells[event]
evsw.mtx.Unlock()
if eventCell == nil {
return
}
// Remove listenerID from eventCell
numListeners := eventCell.RemoveListener(listenerID)
// Maybe garbage collect eventCell.
if numListeners == 0 {
// Lock again and double check.
evsw.mtx.Lock() // OUTER LOCK
eventCell.mtx.Lock() // INNER LOCK
if len(eventCell.listeners) == 0 {
delete(evsw.eventCells, event)
}
eventCell.mtx.Unlock() // INNER LOCK
evsw.mtx.Unlock() // OUTER LOCK
}
}
func (evsw *EventSwitch) FireEvent(event string, data types.EventData) {
// Get the eventCell
evsw.mtx.RLock()
eventCell := evsw.eventCells[event]
evsw.mtx.RUnlock()
if eventCell == nil {
return
}
// Fire event for all listeners in eventCell
eventCell.FireEvent(data)
}
func (evsw *EventSwitch) SubscribeToEvent(receiver, eventID string, chanCap int) chan interface{} {
// listen for new round
ch := make(chan interface{}, chanCap)
evsw.AddListenerForEvent(receiver, eventID, func(data types.EventData) {
// NOTE: in production, evsw callbacks should be nonblocking.
ch <- data
})
return ch
}
//-----------------------------------------------------------------------------
// eventCell handles keeping track of listener callbacks for a given event.
type eventCell struct {
mtx sync.RWMutex
listeners map[string]eventCallback
}
func newEventCell() *eventCell {
return &eventCell{
listeners: make(map[string]eventCallback),
}
}
func (cell *eventCell) AddListener(listenerID string, cb eventCallback) {
cell.mtx.Lock()
cell.listeners[listenerID] = cb
cell.mtx.Unlock()
}
func (cell *eventCell) RemoveListener(listenerID string) int {
cell.mtx.Lock()
delete(cell.listeners, listenerID)
numListeners := len(cell.listeners)
cell.mtx.Unlock()
return numListeners
}
func (cell *eventCell) FireEvent(data types.EventData) {
cell.mtx.RLock()
for _, listener := range cell.listeners {
listener(data)
}
cell.mtx.RUnlock()
}
//-----------------------------------------------------------------------------
type eventCallback func(data types.EventData)
type eventListener struct {
id string
mtx sync.RWMutex
removed bool
events []string
}
func newEventListener(id string) *eventListener {
return &eventListener{
id: id,
removed: false,
events: nil,
}
}
func (evl *eventListener) AddEvent(event string) {
evl.mtx.Lock()
defer evl.mtx.Unlock()
if evl.removed {
return
}
evl.events = append(evl.events, event)
}
func (evl *eventListener) GetEvents() []string {
evl.mtx.RLock()
defer evl.mtx.RUnlock()
events := make([]string, len(evl.events))
copy(events, evl.events)
return events
}
func (evl *eventListener) SetRemoved() {
evl.mtx.Lock()
defer evl.mtx.Unlock()
evl.removed = true
}

+ 0
- 7
events/log.go View File

@ -1,7 +0,0 @@
package events
import (
"github.com/tendermint/go-logger"
)
var log = logger.New("module", "events")

+ 1
- 1
mempool/reactor.go View File

@ -10,7 +10,7 @@ import (
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/go-events"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )


+ 6
- 6
node/node.go View File

@ -12,16 +12,16 @@ import (
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-crypto" "github.com/tendermint/go-crypto"
dbm "github.com/tendermint/go-db" dbm "github.com/tendermint/go-db"
"github.com/tendermint/go-events"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-rpc"
"github.com/tendermint/go-rpc/server"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/events"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/rpc"
"github.com/tendermint/tendermint/rpc/core" "github.com/tendermint/tendermint/rpc/core"
"github.com/tendermint/tendermint/rpc/server"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
"github.com/tendermint/tmsp/example/golang" "github.com/tendermint/tmsp/example/golang"
@ -332,10 +332,10 @@ func getProxyApp(addr string, hash []byte) (proxyAppCtx proxy.AppContext) {
if err != nil { if err != nil {
Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
} }
proxyAppCtx := proxy.NewRemoteAppContext(proxyConn, 1024)
proxyAppCtx.Start()
remoteApp := proxy.NewRemoteAppContext(proxyConn, 1024)
remoteApp.Start()
proxyAppCtx = remoteApp
} }
// Check the hash // Check the hash


+ 0
- 18
node/node_test.go View File

@ -4,30 +4,12 @@ import (
"testing" "testing"
"time" "time"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
_ "github.com/tendermint/tendermint/config/tendermint_test" _ "github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tmsp/example/golang"
"github.com/tendermint/tmsp/server"
) )
func TestNodeStartStop(t *testing.T) { func TestNodeStartStop(t *testing.T) {
// Start a dummy app
go func() {
_, err := server.StartListener(config.GetString("proxy_app"), example.NewDummyApplication())
if err != nil {
Exit(err.Error())
}
}()
// wait for the server
time.Sleep(time.Second * 2)
// Get PrivValidator
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
// Create & start node // Create & start node
n := NewNode(privValidator) n := NewNode(privValidator)
l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp")) l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp"))


+ 0
- 133
rpc/client/http_client.go View File

@ -1,133 +0,0 @@
package rpcclient
import (
"bytes"
"errors"
"io/ioutil"
"net/http"
"net/url"
"strings"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/rpc/types"
)
// JSON rpc takes params as a slice
type ClientJSONRPC struct {
remote string
}
func NewClientJSONRPC(remote string) *ClientJSONRPC {
return &ClientJSONRPC{remote}
}
func (c *ClientJSONRPC) Call(method string, params []interface{}) (interface{}, error) {
return CallHTTP_JSONRPC(c.remote, method, params)
}
// URI takes params as a map
type ClientURI struct {
remote string
}
func NewClientURI(remote string) *ClientURI {
if !strings.HasSuffix(remote, "/") {
remote = remote + "/"
}
return &ClientURI{remote}
}
func (c *ClientURI) Call(method string, params map[string]interface{}) (interface{}, error) {
return CallHTTP_URI(c.remote, method, params)
}
func CallHTTP_JSONRPC(remote string, method string, params []interface{}) (interface{}, error) {
// Make request and get responseBytes
request := rpctypes.RPCRequest{
JSONRPC: "2.0",
Method: method,
Params: params,
ID: "",
}
requestBytes := wire.JSONBytes(request)
requestBuf := bytes.NewBuffer(requestBytes)
log.Info(Fmt("RPC request to %v: %v", remote, string(requestBytes)))
httpResponse, err := http.Post(remote, "text/json", requestBuf)
if err != nil {
return nil, err
}
defer httpResponse.Body.Close()
responseBytes, err := ioutil.ReadAll(httpResponse.Body)
if err != nil {
return nil, err
}
log.Info(Fmt("RPC response: %v", string(responseBytes)))
return unmarshalResponseBytes(responseBytes)
}
func CallHTTP_URI(remote string, method string, params map[string]interface{}) (interface{}, error) {
values, err := argsToURLValues(params)
if err != nil {
return nil, err
}
log.Info(Fmt("URI request to %v: %v", remote, values))
resp, err := http.PostForm(remote+method, values)
if err != nil {
return nil, err
}
defer resp.Body.Close()
responseBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return unmarshalResponseBytes(responseBytes)
}
//------------------------------------------------
func unmarshalResponseBytes(responseBytes []byte) (interface{}, error) {
// read response
// if rpc/core/types is imported, the result will unmarshal
// into the correct type
var err error
response := &rpctypes.RPCResponse{}
wire.ReadJSON(response, responseBytes, &err)
if err != nil {
return nil, err
}
errorStr := response.Error
if errorStr != "" {
return nil, errors.New(errorStr)
}
return response.Result, err
}
func argsToURLValues(args map[string]interface{}) (url.Values, error) {
values := make(url.Values)
if len(args) == 0 {
return values, nil
}
err := argsToJson(args)
if err != nil {
return nil, err
}
for key, val := range args {
values.Set(key, val.(string))
}
return values, nil
}
func argsToJson(args map[string]interface{}) error {
var n int
var err error
for k, v := range args {
buf := new(bytes.Buffer)
wire.WriteJSON(v, buf, &n, &err)
if err != nil {
return err
}
args[k] = buf.String()
}
return nil
}

+ 0
- 7
rpc/client/log.go View File

@ -1,7 +0,0 @@
package rpcclient
import (
"github.com/tendermint/log15"
)
var log = log15.New("module", "rpcclient")

+ 0
- 116
rpc/client/ws_client.go View File

@ -1,116 +0,0 @@
package rpcclient
import (
"net/http"
"time"
"github.com/gorilla/websocket"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/rpc/types"
)
const (
wsResultsChannelCapacity = 10
wsWriteTimeoutSeconds = 10
)
type WSClient struct {
QuitService
Address string
*websocket.Conn
ResultsCh chan rpctypes.Result // closes upon WSClient.Stop()
}
// create a new connection
func NewWSClient(addr string) *WSClient {
wsClient := &WSClient{
Address: addr,
Conn: nil,
ResultsCh: make(chan rpctypes.Result, wsResultsChannelCapacity),
}
wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient)
return wsClient
}
func (wsc *WSClient) OnStart() error {
wsc.QuitService.OnStart()
err := wsc.dial()
if err != nil {
return err
}
go wsc.receiveEventsRoutine()
return nil
}
func (wsc *WSClient) dial() error {
// Dial
dialer := websocket.DefaultDialer
rHeader := http.Header{}
con, _, err := dialer.Dial(wsc.Address, rHeader)
if err != nil {
return err
}
// Set the ping/pong handlers
con.SetPingHandler(func(m string) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
return nil
})
con.SetPongHandler(func(m string) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
return nil
})
wsc.Conn = con
return nil
}
func (wsc *WSClient) OnStop() {
wsc.QuitService.OnStop()
// ResultsCh is closed in receiveEventsRoutine.
}
func (wsc *WSClient) receiveEventsRoutine() {
for {
_, data, err := wsc.ReadMessage()
if err != nil {
log.Info("WSClient failed to read message", "error", err, "data", string(data))
wsc.Stop()
break
} else {
var response rpctypes.RPCResponse
wire.ReadJSON(&response, data, &err)
if err != nil {
log.Info("WSClient failed to parse message", "error", err)
wsc.Stop()
break
}
wsc.ResultsCh <- response.Result
}
}
// Cleanup
close(wsc.ResultsCh)
}
// subscribe to an event
func (wsc *WSClient) Subscribe(eventid string) error {
err := wsc.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "subscribe",
Params: []interface{}{eventid},
})
return err
}
// unsubscribe from an event
func (wsc *WSClient) Unsubscribe(eventid string) error {
err := wsc.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "unsubscribe",
Params: []interface{}{eventid},
})
return err
}

+ 4
- 4
rpc/core/events.go View File

@ -1,14 +1,14 @@
package core package core
import ( import (
"github.com/tendermint/go-events"
"github.com/tendermint/go-rpc/types"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/rpc/types"
"github.com/tendermint/tendermint/types"
) )
func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) { func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) {
log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event)
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) {
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) {
// NOTE: EventSwitch callbacks must be nonblocking // NOTE: EventSwitch callbacks must be nonblocking
// NOTE: RPCResponses of subscribed events have id suffix "#event" // NOTE: RPCResponses of subscribed events have id suffix "#event"
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, "")) wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, ""))
@ -18,7 +18,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri
func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) { func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) {
log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event)
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) {
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) {
// NOTE: EventSwitch callbacks must be nonblocking // NOTE: EventSwitch callbacks must be nonblocking
// NOTE: RPCResponses of subscribed events have id suffix "#event" // NOTE: RPCResponses of subscribed events have id suffix "#event"
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, "")) wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, ""))


+ 1
- 1
rpc/core/routes.go View File

@ -1,7 +1,7 @@
package core package core
import ( import (
rpc "github.com/tendermint/tendermint/rpc/server"
rpc "github.com/tendermint/go-rpc/server"
) )
// TODO: eliminate redundancy between here and reading code from core/ // TODO: eliminate redundancy between here and reading code from core/


+ 5
- 3
rpc/core/types/responses.go View File

@ -2,9 +2,10 @@ package core_types
import ( import (
"github.com/tendermint/go-crypto" "github.com/tendermint/go-crypto"
"github.com/tendermint/go-events"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-rpc/types"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/rpc/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -67,9 +68,10 @@ type ResultSubscribe struct {
type ResultUnsubscribe struct { type ResultUnsubscribe struct {
} }
// TODO: something about this
type ResultEvent struct { type ResultEvent struct {
Event string `json:"event"`
Data types.EventData `json:"data"`
Event string `json:"event"`
Data events.EventData `json:"data"`
} }
//---------------------------------------- //----------------------------------------


+ 0
- 553
rpc/server/handlers.go View File

@ -1,553 +0,0 @@
package rpcserver
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"reflect"
"sort"
"time"
"github.com/gorilla/websocket"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/events"
. "github.com/tendermint/tendermint/rpc/types"
)
func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
// HTTP endpoints
for funcName, rpcFunc := range funcMap {
mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
}
// JSONRPC endpoints
mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
}
//-------------------------------------
// function introspection
// holds all type information for each function
type RPCFunc struct {
f reflect.Value // underlying rpc function
args []reflect.Type // type of each function arg
returns []reflect.Type // type of each return arg
argNames []string // name of each argument
ws bool // websocket only
}
// wraps a function for quicker introspection
func NewRPCFunc(f interface{}, args []string) *RPCFunc {
return &RPCFunc{
f: reflect.ValueOf(f),
args: funcArgTypes(f),
returns: funcReturnTypes(f),
argNames: args,
ws: false,
}
}
func NewWSRPCFunc(f interface{}, args []string) *RPCFunc {
return &RPCFunc{
f: reflect.ValueOf(f),
args: funcArgTypes(f),
returns: funcReturnTypes(f),
argNames: args,
ws: true,
}
}
// return a function's argument types
func funcArgTypes(f interface{}) []reflect.Type {
t := reflect.TypeOf(f)
n := t.NumIn()
typez := make([]reflect.Type, n)
for i := 0; i < n; i++ {
typez[i] = t.In(i)
}
return typez
}
// return a function's return types
func funcReturnTypes(f interface{}) []reflect.Type {
t := reflect.TypeOf(f)
n := t.NumOut()
typez := make([]reflect.Type, n)
for i := 0; i < n; i++ {
typez[i] = t.Out(i)
}
return typez
}
// function introspection
//-----------------------------------------------------------------------------
// rpc.json
// jsonrpc calls grab the given method's function info and runs reflect.Call
func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
b, _ := ioutil.ReadAll(r.Body)
// if its an empty request (like from a browser),
// just display a list of functions
if len(b) == 0 {
writeListOfEndpoints(w, r, funcMap)
return
}
var request RPCRequest
err := json.Unmarshal(b, &request)
if err != nil {
WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
return
}
if len(r.URL.Path) > 1 {
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
return
}
rpcFunc := funcMap[request.Method]
if rpcFunc == nil {
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
return
}
if rpcFunc.ws {
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method))
return
}
args, err := jsonParamsToArgs(rpcFunc, request.Params)
if err != nil {
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error()))
return
}
returns := rpcFunc.f.Call(args)
log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
result, err := unreflectResult(returns)
if err != nil {
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error()))
return
}
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, ""))
}
}
// Convert a list of interfaces to properly typed values
func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
if len(rpcFunc.argNames) != len(params) {
return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
}
values := make([]reflect.Value, len(params))
for i, p := range params {
ty := rpcFunc.args[i]
v, err := _jsonObjectToArg(ty, p)
if err != nil {
return nil, err
}
values[i] = v
}
return values, nil
}
// Same as above, but with the first param the websocket connection
func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) {
if len(rpcFunc.argNames) != len(params) {
return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params))
}
values := make([]reflect.Value, len(params)+1)
values[0] = reflect.ValueOf(wsCtx)
for i, p := range params {
ty := rpcFunc.args[i+1]
v, err := _jsonObjectToArg(ty, p)
if err != nil {
return nil, err
}
values[i+1] = v
}
return values, nil
}
func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
var err error
v := reflect.New(ty)
wire.ReadJSONObjectPtr(v.Interface(), object, &err)
if err != nil {
return v, err
}
v = v.Elem()
return v, nil
}
// rpc.json
//-----------------------------------------------------------------------------
// rpc.http
// convert from a function name to the http handler
func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
// Exception for websocket endpoints
if rpcFunc.ws {
return func(w http.ResponseWriter, r *http.Request) {
WriteRPCResponseHTTP(w, NewRPCResponse("", nil, "This RPC method is only for websockets"))
}
}
// All other endpoints
return func(w http.ResponseWriter, r *http.Request) {
args, err := httpParamsToArgs(rpcFunc, r)
if err != nil {
WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
return
}
returns := rpcFunc.f.Call(args)
log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
result, err := unreflectResult(returns)
if err != nil {
WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
return
}
WriteRPCResponseHTTP(w, NewRPCResponse("", result, ""))
}
}
// Covert an http query to a list of properly typed values.
// To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
argTypes := rpcFunc.args
argNames := rpcFunc.argNames
var err error
values := make([]reflect.Value, len(argNames))
for i, name := range argNames {
ty := argTypes[i]
arg := GetParam(r, name)
values[i], err = _jsonStringToArg(ty, arg)
if err != nil {
return nil, err
}
}
return values, nil
}
func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
var err error
v := reflect.New(ty)
wire.ReadJSONPtr(v.Interface(), []byte(arg), &err)
if err != nil {
return v, err
}
v = v.Elem()
return v, nil
}
// rpc.http
//-----------------------------------------------------------------------------
// rpc.websocket
const (
writeChanCapacity = 1000
wsWriteTimeoutSeconds = 30 // each write times out after this
wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings.
wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds.
)
// a single websocket connection
// contains listener id, underlying ws connection,
// and the event switch for subscribing to events
type wsConnection struct {
QuitService
remoteAddr string
baseConn *websocket.Conn
writeChan chan RPCResponse
readTimeout *time.Timer
pingTicker *time.Ticker
funcMap map[string]*RPCFunc
evsw *events.EventSwitch
}
// new websocket connection wrapper
func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *wsConnection {
wsc := &wsConnection{
remoteAddr: baseConn.RemoteAddr().String(),
baseConn: baseConn,
writeChan: make(chan RPCResponse, writeChanCapacity), // error when full.
funcMap: funcMap,
evsw: evsw,
}
wsc.QuitService = *NewQuitService(log, "wsConnection", wsc)
return wsc
}
// wsc.Start() blocks until the connection closes.
func (wsc *wsConnection) OnStart() error {
wsc.QuitService.OnStart()
// Read subscriptions/unsubscriptions to events
go wsc.readRoutine()
// Custom Ping handler to touch readTimeout
wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds)
wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds)
wsc.baseConn.SetPingHandler(func(m string) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
return nil
})
wsc.baseConn.SetPongHandler(func(m string) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
return nil
})
go wsc.readTimeoutRoutine()
// Write responses, BLOCKING.
wsc.writeRoutine()
return nil
}
func (wsc *wsConnection) OnStop() {
wsc.QuitService.OnStop()
wsc.evsw.RemoveListener(wsc.remoteAddr)
wsc.readTimeout.Stop()
wsc.pingTicker.Stop()
// The write loop closes the websocket connection
// when it exits its loop, and the read loop
// closes the writeChan
}
func (wsc *wsConnection) readTimeoutRoutine() {
select {
case <-wsc.readTimeout.C:
log.Notice("Stopping connection due to read timeout")
wsc.Stop()
case <-wsc.Quit:
return
}
}
// Implements WSRPCConnection
func (wsc *wsConnection) GetRemoteAddr() string {
return wsc.remoteAddr
}
// Implements WSRPCConnection
func (wsc *wsConnection) GetEventSwitch() *events.EventSwitch {
return wsc.evsw
}
// Implements WSRPCConnection
// Blocking write to writeChan until service stops.
func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) {
select {
case <-wsc.Quit:
return
case wsc.writeChan <- resp:
}
}
// Implements WSRPCConnection
// Nonblocking write.
func (wsc *wsConnection) TryWriteRPCResponse(resp RPCResponse) bool {
select {
case <-wsc.Quit:
return false
case wsc.writeChan <- resp:
return true
default:
return false
}
}
// Read from the socket and subscribe to or unsubscribe from events
func (wsc *wsConnection) readRoutine() {
// Do not close writeChan, to allow WriteRPCResponse() to fail.
// defer close(wsc.writeChan)
for {
select {
case <-wsc.Quit:
return
default:
var in []byte
// Do not set a deadline here like below:
// wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds))
// The client may not send anything for a while.
// We use `readTimeout` to handle read timeouts.
_, in, err := wsc.baseConn.ReadMessage()
if err != nil {
log.Notice("Failed to read from connection", "remote", wsc.remoteAddr)
// an error reading the connection,
// kill the connection
wsc.Stop()
return
}
var request RPCRequest
err = json.Unmarshal(in, &request)
if err != nil {
errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, errStr))
continue
}
// Now, fetch the RPCFunc and execute it.
rpcFunc := wsc.funcMap[request.Method]
if rpcFunc == nil {
wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
continue
}
var args []reflect.Value
if rpcFunc.ws {
wsCtx := WSRPCContext{Request: request, WSRPCConnection: wsc}
args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx)
} else {
args, err = jsonParamsToArgs(rpcFunc, request.Params)
}
if err != nil {
wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
continue
}
returns := rpcFunc.f.Call(args)
log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns)
result, err := unreflectResult(returns)
if err != nil {
wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
continue
} else {
wsc.WriteRPCResponse(NewRPCResponse(request.ID, result, ""))
continue
}
}
}
}
// receives on a write channel and writes out on the socket
func (wsc *wsConnection) writeRoutine() {
defer wsc.baseConn.Close()
var n, err = int(0), error(nil)
for {
select {
case <-wsc.Quit:
return
case <-wsc.pingTicker.C:
err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
log.Error("Failed to write ping message on websocket", "error", err)
wsc.Stop()
return
}
case msg := <-wsc.writeChan:
buf := new(bytes.Buffer)
wire.WriteJSON(msg, buf, &n, &err)
if err != nil {
log.Error("Failed to marshal RPCResponse to JSON", "error", err)
} else {
wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds))
bufBytes := buf.Bytes()
if err = wsc.baseConn.WriteMessage(websocket.TextMessage, bufBytes); err != nil {
log.Warn("Failed to write response on websocket", "error", err)
wsc.Stop()
return
}
}
}
}
}
//----------------------------------------
// Main manager for all websocket connections
// Holds the event switch
// NOTE: The websocket path is defined externally, e.g. in node/node.go
type WebsocketManager struct {
websocket.Upgrader
funcMap map[string]*RPCFunc
evsw *events.EventSwitch
}
func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WebsocketManager {
return &WebsocketManager{
funcMap: funcMap,
evsw: evsw,
Upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// TODO
return true
},
},
}
}
// Upgrade the request/response (via http.Hijack) and starts the wsConnection.
func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
wsConn, err := wm.Upgrade(w, r, nil)
if err != nil {
// TODO - return http error
log.Error("Failed to upgrade to websocket connection", "error", err)
return
}
// register connection
con := NewWSConnection(wsConn, wm.funcMap, wm.evsw)
log.Notice("New websocket connection", "remote", con.remoteAddr)
con.Start() // Blocking
}
// rpc.websocket
//-----------------------------------------------------------------------------
// returns is result struct and error. If error is not nil, return it
func unreflectResult(returns []reflect.Value) (interface{}, error) {
errV := returns[1]
if errV.Interface() != nil {
return nil, fmt.Errorf("%v", errV.Interface())
}
return returns[0].Interface(), nil
}
// writes a list of available rpc endpoints as an html page
func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) {
noArgNames := []string{}
argNames := []string{}
for name, funcData := range funcMap {
if len(funcData.args) == 0 {
noArgNames = append(noArgNames, name)
} else {
argNames = append(argNames, name)
}
}
sort.Strings(noArgNames)
sort.Strings(argNames)
buf := new(bytes.Buffer)
buf.WriteString("<html><body>")
buf.WriteString("<br>Available endpoints:<br>")
for _, name := range noArgNames {
link := fmt.Sprintf("http://%s/%s", r.Host, name)
buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
}
buf.WriteString("<br>Endpoints that require arguments:<br>")
for _, name := range argNames {
link := fmt.Sprintf("http://%s/%s?", r.Host, name)
funcData := funcMap[name]
for i, argName := range funcData.argNames {
link += argName + "=_"
if i < len(funcData.argNames)-1 {
link += "&"
}
}
buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
}
buf.WriteString("</body></html>")
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(200)
w.Write(buf.Bytes())
}

+ 0
- 89
rpc/server/http_params.go View File

@ -1,89 +0,0 @@
package rpcserver
import (
"encoding/hex"
"fmt"
"net/http"
"regexp"
"strconv"
)
var (
// Parts of regular expressions
atom = "[A-Z0-9!#$%&'*+\\-/=?^_`{|}~]+"
dotAtom = atom + `(?:\.` + atom + `)*`
domain = `[A-Z0-9.-]+\.[A-Z]{2,4}`
RE_HEX = regexp.MustCompile(`^(?i)[a-f0-9]+$`)
RE_EMAIL = regexp.MustCompile(`^(?i)(` + dotAtom + `)@(` + dotAtom + `)$`)
RE_ADDRESS = regexp.MustCompile(`^(?i)[a-z0-9]{25,34}$`)
RE_HOST = regexp.MustCompile(`^(?i)(` + domain + `)$`)
//RE_ID12 = regexp.MustCompile(`^[a-zA-Z0-9]{12}$`)
)
func GetParam(r *http.Request, param string) string {
s := r.URL.Query().Get(param)
if s == "" {
s = r.FormValue(param)
}
return s
}
func GetParamByteSlice(r *http.Request, param string) ([]byte, error) {
s := GetParam(r, param)
return hex.DecodeString(s)
}
func GetParamInt64(r *http.Request, param string) (int64, error) {
s := GetParam(r, param)
i, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0, fmt.Errorf(param, err.Error())
}
return i, nil
}
func GetParamInt32(r *http.Request, param string) (int32, error) {
s := GetParam(r, param)
i, err := strconv.ParseInt(s, 10, 32)
if err != nil {
return 0, fmt.Errorf(param, err.Error())
}
return int32(i), nil
}
func GetParamUint64(r *http.Request, param string) (uint64, error) {
s := GetParam(r, param)
i, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return 0, fmt.Errorf(param, err.Error())
}
return i, nil
}
func GetParamUint(r *http.Request, param string) (uint, error) {
s := GetParam(r, param)
i, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return 0, fmt.Errorf(param, err.Error())
}
return uint(i), nil
}
func GetParamRegexp(r *http.Request, param string, re *regexp.Regexp) (string, error) {
s := GetParam(r, param)
if !re.MatchString(s) {
return "", fmt.Errorf(param, "Did not match regular expression %v", re.String())
}
return s, nil
}
func GetParamFloat64(r *http.Request, param string) (float64, error) {
s := GetParam(r, param)
f, err := strconv.ParseFloat(s, 64)
if err != nil {
return 0, fmt.Errorf(param, err.Error())
}
return f, nil
}

+ 0
- 115
rpc/server/http_server.go View File

@ -1,115 +0,0 @@
// Commons for HTTP handling
package rpcserver
import (
"bufio"
"fmt"
"net"
"net/http"
"runtime/debug"
"time"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/alert"
. "github.com/tendermint/tendermint/rpc/types"
)
func StartHTTPServer(listenAddr string, handler http.Handler) (net.Listener, error) {
log.Notice(Fmt("Starting RPC HTTP server on %v", listenAddr))
listener, err := net.Listen("tcp", listenAddr)
if err != nil {
return nil, fmt.Errorf("Failed to listen to %v", listenAddr)
}
go func() {
res := http.Serve(
listener,
RecoverAndLogHandler(handler),
)
log.Crit("RPC HTTP server stopped", "result", res)
}()
return listener, nil
}
func WriteRPCResponseHTTP(w http.ResponseWriter, res RPCResponse) {
jsonBytes := wire.JSONBytesPretty(res)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
w.Write(jsonBytes)
}
//-----------------------------------------------------------------------------
// Wraps an HTTP handler, adding error logging.
// If the inner function panics, the outer function recovers, logs, sends an
// HTTP 500 error response.
func RecoverAndLogHandler(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Wrap the ResponseWriter to remember the status
rww := &ResponseWriterWrapper{-1, w}
begin := time.Now()
// Common headers
origin := r.Header.Get("Origin")
rww.Header().Set("Access-Control-Allow-Origin", origin)
rww.Header().Set("Access-Control-Allow-Credentials", "true")
rww.Header().Set("Access-Control-Expose-Headers", "X-Server-Time")
rww.Header().Set("X-Server-Time", fmt.Sprintf("%v", begin.Unix()))
defer func() {
// Send a 500 error if a panic happens during a handler.
// Without this, Chrome & Firefox were retrying aborted ajax requests,
// at least to my localhost.
if e := recover(); e != nil {
// If RPCResponse
if res, ok := e.(RPCResponse); ok {
WriteRPCResponseHTTP(rww, res)
} else {
// For the rest,
log.Error("Panic in RPC HTTP handler", "error", e, "stack", string(debug.Stack()))
rww.WriteHeader(http.StatusInternalServerError)
WriteRPCResponseHTTP(rww, NewRPCResponse("", nil, Fmt("Internal Server Error: %v", e)))
}
}
// Finally, log.
durationMS := time.Since(begin).Nanoseconds() / 1000000
if rww.Status == -1 {
rww.Status = 200
}
log.Info("Served RPC HTTP response",
"method", r.Method, "url", r.URL,
"status", rww.Status, "duration", durationMS,
"remoteAddr", r.RemoteAddr,
)
}()
handler.ServeHTTP(rww, r)
})
}
// Remember the status for logging
type ResponseWriterWrapper struct {
Status int
http.ResponseWriter
}
func (w *ResponseWriterWrapper) WriteHeader(status int) {
w.Status = status
w.ResponseWriter.WriteHeader(status)
}
// implements http.Hijacker
func (w *ResponseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return w.ResponseWriter.(http.Hijacker).Hijack()
}
// Stick it as a deferred statement in gouroutines to prevent the program from crashing.
func Recover(daemonName string) {
if e := recover(); e != nil {
stack := string(debug.Stack())
errorString := fmt.Sprintf("[%s] %s\n%s", daemonName, e, stack)
alert.Alert(errorString)
}
}

+ 0
- 7
rpc/server/log.go View File

@ -1,7 +0,0 @@
package rpcserver
import (
"github.com/tendermint/log15"
)
var log = log15.New("module", "rpcserver")

+ 27
- 22
rpc/test/helpers.go View File

@ -11,30 +11,48 @@ import (
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
client "github.com/tendermint/go-rpc/client"
"github.com/tendermint/go-rpc/types"
_ "github.com/tendermint/tendermint/config/tendermint_test" _ "github.com/tendermint/tendermint/config/tendermint_test"
nm "github.com/tendermint/tendermint/node" nm "github.com/tendermint/tendermint/node"
client "github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/rpc/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
// global variables for use across all tests // global variables for use across all tests
var ( var (
rpcAddr = "127.0.0.1:36657" // Not 46657
requestAddr = "http://" + rpcAddr
websocketAddr = "ws://" + rpcAddr + "/websocket"
node *nm.Node node *nm.Node
mempoolCount = 0 mempoolCount = 0
chainID string chainID string
clientURI = client.NewClientURI(requestAddr)
clientJSON = client.NewClientJSONRPC(requestAddr)
rpcAddr, requestAddr, websocketAddr string
clientURI *client.ClientURI
clientJSON *client.ClientJSONRPC
) )
// initialize config and create new node
func init() {
initConfig()
chainID = config.GetString("chain_id")
rpcAddr = config.GetString("rpc_laddr")
requestAddr = "http://" + rpcAddr
websocketAddr = "ws://" + rpcAddr + "/websocket"
clientURI = client.NewClientURI(requestAddr)
clientJSON = client.NewClientJSONRPC(requestAddr)
// TODO: change consensus/state.go timeouts to be shorter
// start a node
ready := make(chan struct{})
go newNode(ready)
<-ready
}
// create a new node and sleep forever // create a new node and sleep forever
func newNode(ready chan struct{}) { func newNode(ready chan struct{}) {
// Create & start node // Create & start node
@ -52,19 +70,6 @@ func newNode(ready chan struct{}) {
<-ch <-ch
} }
// initialize config and create new node
func init() {
initConfig()
chainID = config.GetString("chain_id")
// TODO: change consensus/state.go timeouts to be shorter
// start a node
ready := make(chan struct{})
go newNode(ready)
<-ready
}
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
// Utilities for testing the websocket service // Utilities for testing the websocket service
@ -192,7 +197,7 @@ func unmarshalResponseNewBlock(b []byte) (*types.Block, error) {
func unmarshalValidateBlockchain(t *testing.T, con *websocket.Conn, eid string) { func unmarshalValidateBlockchain(t *testing.T, con *websocket.Conn, eid string) {
var initBlockN int var initBlockN int
for i := 0; i < 2; i++ {
for i := 0; i < 3; i++ {
waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error {
block, err := unmarshalResponseNewBlock(b) block, err := unmarshalResponseNewBlock(b)
if err != nil { if err != nil {


+ 0
- 71
rpc/types/types.go View File

@ -1,71 +0,0 @@
package rpctypes
import (
"github.com/tendermint/tendermint/events"
)
type RPCRequest struct {
JSONRPC string `json:"jsonrpc"`
ID string `json:"id"`
Method string `json:"method"`
Params []interface{} `json:"params"`
}
func NewRPCRequest(id string, method string, params []interface{}) RPCRequest {
return RPCRequest{
JSONRPC: "2.0",
ID: id,
Method: method,
Params: params,
}
}
//----------------------------------------
/*
Result is a generic interface.
Applications should register type-bytes like so:
var _ = wire.RegisterInterface(
struct{ Result }{},
wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis},
wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo},
...
)
*/
type Result interface {
}
//----------------------------------------
type RPCResponse struct {
JSONRPC string `json:"jsonrpc"`
ID string `json:"id"`
Result Result `json:"result"`
Error string `json:"error"`
}
func NewRPCResponse(id string, res Result, err string) RPCResponse {
return RPCResponse{
JSONRPC: "2.0",
ID: id,
Result: res,
Error: err,
}
}
//----------------------------------------
// *wsConnection implements this interface.
type WSRPCConnection interface {
GetRemoteAddr() string
GetEventSwitch() *events.EventSwitch
WriteRPCResponse(resp RPCResponse)
TryWriteRPCResponse(resp RPCResponse) bool
}
// websocket-only RPCFuncs take this as the first parameter.
type WSRPCContext struct {
Request RPCRequest
WSRPCConnection
}

+ 0
- 3
rpc/version.go View File

@ -1,3 +0,0 @@
package rpc
const Version = "0.4.0"

+ 1
- 0
state/state.go View File

@ -8,6 +8,7 @@ import (
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
dbm "github.com/tendermint/go-db" dbm "github.com/tendermint/go-db"
"github.com/tendermint/go-events"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )


+ 3
- 5
types/events.go View File

@ -1,6 +1,8 @@
package types package types
import ( import (
// for registering EventData
"github.com/tendermint/go-events"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
) )
@ -38,12 +40,8 @@ const (
EventDataTypeVote = byte(0x12) EventDataTypeVote = byte(0x12)
) )
type EventData interface {
AssertIsEventData()
}
var _ = wire.RegisterInterface( var _ = wire.RegisterInterface(
struct{ EventData }{},
struct{ events.EventData }{},
wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock}, wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock},
// wire.ConcreteType{EventDataFork{}, EventDataTypeFork }, // wire.ConcreteType{EventDataFork{}, EventDataTypeFork },
wire.ConcreteType{EventDataTx{}, EventDataTypeTx}, wire.ConcreteType{EventDataTx{}, EventDataTypeTx},


Loading…
Cancel
Save