Browse Source

Begin writing RPC HTTP Server

pull/9/head
Jae Kwon 10 years ago
parent
commit
8e9c060e6d
12 changed files with 475 additions and 48 deletions
  1. +64
    -0
      alert/alert.go
  2. +178
    -0
      alert/email.go
  3. +15
    -0
      alert/log.go
  4. +8
    -2
      cmd/daemon.go
  5. +1
    -23
      cmd/log.go
  6. +36
    -16
      config/config.go
  7. +2
    -2
      p2p/addrbook.go
  8. +2
    -2
      p2p/switch.go
  9. +131
    -0
      rpc/handler.go
  10. +23
    -0
      rpc/http_server.go
  11. +15
    -0
      rpc/log.go
  12. +0
    -3
      rpc/rpc.go

+ 64
- 0
alert/alert.go View File

@ -0,0 +1,64 @@
package alert
import (
"fmt"
"github.com/sfreiberg/gotwilio"
"time"
. "github.com/tendermint/tendermint/config"
)
var last int64 = 0
var count int = 0
func Alert(message string) {
log.Error("<!> ALERT <!>\n" + message)
now := time.Now().Unix()
if now-last > int64(Config.Alert.MinInterval) {
message = fmt.Sprintf("%v:%v", Config.Network, message)
if count > 0 {
message = fmt.Sprintf("%v (+%v more since)", message, count)
count = 0
}
if len(Config.Alert.TwilioSid) > 0 {
go sendTwilio(message)
}
if len(Config.Alert.EmailRecipients) > 0 {
go sendEmail(message)
}
} else {
count++
}
}
func sendTwilio(message string) {
defer func() {
if err := recover(); err != nil {
log.Error("sendTwilio error: %v", err)
}
}()
if len(message) > 50 {
message = message[:50]
}
twilio := gotwilio.NewTwilioClient(Config.Alert.TwilioSid, Config.Alert.TwilioToken)
res, exp, err := twilio.SendSMS(Config.Alert.TwilioFrom, Config.Alert.TwilioTo, message, "", "")
if exp != nil || err != nil {
log.Error("sendTwilio error: %v %v %v", res, exp, err)
}
}
func sendEmail(message string) {
defer func() {
if err := recover(); err != nil {
log.Error("sendEmail error: %v", err)
}
}()
subject := message
if len(subject) > 80 {
subject = subject[:80]
}
err := SendEmail(subject, message, Config.Alert.EmailRecipients)
if err != nil {
log.Error("sendEmail error: %v\n%v", err, message)
}
}

+ 178
- 0
alert/email.go View File

@ -0,0 +1,178 @@
// Forked from github.com/SlyMarbo/gmail
package alert
import (
"bytes"
"crypto/tls"
"encoding/base64"
"errors"
"fmt"
"io/ioutil"
"net/smtp"
"path/filepath"
"regexp"
"strings"
. "github.com/tendermint/tendermint/config"
)
// Convenience function
func SendEmail(subject, body string, tos []string) error {
email := Compose(subject, body)
email.From = Config.SMTP.User
email.ContentType = "text/html; charset=utf-8"
email.AddRecipients(tos...)
err := email.Send()
return err
}
// Email represents a single message, which may contain
// attachments.
type Email struct {
From string
To []string
Subject string
ContentType string
Body string
Attachments map[string][]byte
}
// Compose begins a new email, filling the subject and body,
// and allocating memory for the list of recipients and the
// attachments.
func Compose(Subject, Body string) *Email {
out := new(Email)
out.To = make([]string, 0, 1)
out.Subject = Subject
out.Body = Body
out.Attachments = make(map[string][]byte)
return out
}
// Attach takes a filename and adds this to the message.
// Note that since only the filename is stored (and not
// its path, for privacy reasons), multiple files in
// different directories but with the same filename and
// extension cannot be sent.
func (e *Email) Attach(Filename string) error {
b, err := ioutil.ReadFile(Filename)
if err != nil {
return err
}
_, fname := filepath.Split(Filename)
e.Attachments[fname] = b
return nil
}
// AddRecipient adds a single recipient.
func (e *Email) AddRecipient(Recipient string) {
e.To = append(e.To, Recipient)
}
// AddRecipients adds one or more recipients.
func (e *Email) AddRecipients(Recipients ...string) {
e.To = append(e.To, Recipients...)
}
// Send sends the email, returning any error encountered.
func (e *Email) Send() error {
if e.From == "" {
return errors.New("Error: No sender specified. Please set the Email.From field.")
}
if e.To == nil || len(e.To) == 0 {
return errors.New("Error: No recipient specified. Please set the Email.To field.")
}
auth := smtp.PlainAuth(
"",
Config.SMTP.User,
Config.SMTP.Password,
Config.SMTP.Host,
)
conn, err := smtp.Dial(fmt.Sprintf("%v:%v", Config.SMTP.Host, Config.SMTP.Port))
if err != nil {
return err
}
err = conn.StartTLS(&tls.Config{})
if err != nil {
return err
}
err = conn.Auth(auth)
if err != nil {
return err
}
err = conn.Mail(e.From)
if err != nil {
if strings.Contains(err.Error(), "530 5.5.1") {
return errors.New("Error: Authentication failure. Your username or password is incorrect.")
}
return err
}
for _, recipient := range e.To {
err = conn.Rcpt(recipient)
if err != nil {
return err
}
}
wc, err := conn.Data()
if err != nil {
return err
}
defer wc.Close()
_, err = wc.Write(e.Bytes())
if err != nil {
return err
}
return nil
}
func (e *Email) Bytes() []byte {
buf := bytes.NewBuffer(nil)
var subject = e.Subject
subject = regexp.MustCompile("\n+").ReplaceAllString(subject, " ")
subject = regexp.MustCompile(" +").ReplaceAllString(subject, " ")
buf.WriteString("Subject: " + subject + "\n")
buf.WriteString("To: <" + strings.Join(e.To, ">,<") + ">\n")
buf.WriteString("MIME-Version: 1.0\n")
// Boundary is used by MIME to separate files.
boundary := "f46d043c813270fc6b04c2d223da"
if len(e.Attachments) > 0 {
buf.WriteString("Content-Type: multipart/mixed; boundary=" + boundary + "\n")
buf.WriteString("--" + boundary + "\n")
}
if e.ContentType == "" {
e.ContentType = "text/plain; charset=utf-8"
}
buf.WriteString(fmt.Sprintf("Content-Type: %s\n\n", e.ContentType))
buf.WriteString(e.Body)
if len(e.Attachments) > 0 {
for k, v := range e.Attachments {
buf.WriteString("\n\n--" + boundary + "\n")
buf.WriteString("Content-Type: application/octet-stream\n")
buf.WriteString("Content-Transfer-Encoding: base64\n")
buf.WriteString("Content-Disposition: attachment; filename=\"" + k + "\"\n\n")
b := make([]byte, base64.StdEncoding.EncodedLen(len(v)))
base64.StdEncoding.Encode(b, v)
buf.Write(b)
buf.WriteString("\n--" + boundary)
}
buf.WriteString("--")
}
return buf.Bytes()
}

+ 15
- 0
alert/log.go View File

@ -0,0 +1,15 @@
package alert
import (
"github.com/op/go-logging"
)
var log = logging.MustGetLogger("alert")
func init() {
logging.SetFormatter(logging.MustStringFormatter("[%{level:.1s}] %{message}"))
}
func SetAlertLogger(l *logging.Logger) {
log = l
}

+ 8
- 2
cmd/daemon.go View File

@ -10,6 +10,7 @@ import (
db_ "github.com/tendermint/tendermint/db"
mempool_ "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/rpc"
state_ "github.com/tendermint/tendermint/state"
)
@ -71,7 +72,7 @@ func NewNode() *Node {
}
func (n *Node) Start() {
log.Info("Starting node")
log.Info("Starting Node")
for _, l := range n.lz {
go n.inboundConnectionRoutine(l)
}
@ -80,7 +81,7 @@ func (n *Node) Start() {
}
func (n *Node) Stop() {
log.Info("Stopping node")
log.Info("Stopping Node")
// TODO: gracefully disconnect from peers.
n.sw.Stop()
n.book.Stop()
@ -134,6 +135,11 @@ func daemon() {
}
}
// Run the RPC server.
if config.Config.RPC.HTTPPort != 0 {
rpc.StartHTTPServer()
}
// Sleep forever and then...
trapSignal(func() {
n.Stop()


+ 1
- 23
cmd/log.go View File

@ -1,14 +1,8 @@
package main
import (
"os"
"github.com/op/go-logging"
"github.com/tendermint/tendermint/blocks"
"github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/state"
"os"
)
var log = logging.MustGetLogger("main")
@ -16,23 +10,7 @@ var log = logging.MustGetLogger("main")
func init() {
// Customize the output format
logging.SetFormatter(logging.MustStringFormatter("[%{level:.4s}] %{time:2006-01-02T15:04:05} %{shortfile:-20s} %{message}"))
logBackend := logging.NewLogBackend(os.Stderr, "", 0)
logBackend.Color = true
logging.SetBackend(logBackend)
// Test
/*
Log.Debug("debug")
Log.Info("info")
Log.Notice("notice")
Log.Warning("warning")
Log.Error("error")
*/
blocks.SetBlocksLogger(log)
consensus.SetConsensusLogger(log)
mempool.SetMempoolLogger(log)
p2p.SetP2PLogger(log)
state.SetStateLogger(log)
}

+ 36
- 16
config/config.go View File

@ -10,12 +10,8 @@ import (
"os"
"path/filepath"
"strings"
//"crypto/rand"
//"encoding/hex"
)
/* Global & initialization */
var RootDir string
var Config Config_
@ -62,7 +58,8 @@ func ParseFlags() {
}
}
/* Default configuration */
//-----------------------------------------------------------------------------j
// Default configuration
var defaultConfig = Config_{
Network: "tendermint_testnet0",
@ -72,25 +69,24 @@ var defaultConfig = Config_{
Type: "level",
Dir: RootDir + "/data",
},
Twilio: TwilioConfig{},
Alert: AlertConfig{},
SMTP: SMTPConfig{},
RPC: RPCConfig{
HTTPPort: 8888,
},
}
/* Configuration types */
//-----------------------------------------------------------------------------j
// Configuration types
type Config_ struct {
Network string
LAddr string
SeedNode string
Db DbConfig
Twilio TwilioConfig
}
type TwilioConfig struct {
Sid string
Token string
From string
To string
MinInterval int
Alert AlertConfig
SMTP SMTPConfig
RPC RPCConfig
}
type DbConfig struct {
@ -98,6 +94,30 @@ type DbConfig struct {
Dir string
}
type AlertConfig struct {
MinInterval int
TwilioSid string
TwilioToken string
TwilioFrom string
TwilioTo string
EmailRecipients []string
}
type SMTPConfig struct {
User string
Password string
Host string
Port uint
}
type RPCConfig struct {
HTTPPort uint
}
//-----------------------------------------------------------------------------j
func (cfg *Config_) validate() error {
if cfg.Network == "" {
cfg.Network = defaultConfig.Network


+ 2
- 2
p2p/addrbook.go View File

@ -127,7 +127,7 @@ func (a *AddrBook) init() {
func (a *AddrBook) Start() {
if atomic.CompareAndSwapUint32(&a.started, 0, 1) {
log.Info("Starting address manager")
log.Info("Starting AddrBook")
a.loadFromFile(a.filePath)
a.wg.Add(1)
go a.saveRoutine()
@ -136,7 +136,7 @@ func (a *AddrBook) Start() {
func (a *AddrBook) Stop() {
if atomic.CompareAndSwapUint32(&a.stopped, 0, 1) {
log.Info("Stopping address manager")
log.Info("Stopping AddrBook")
close(a.quit)
a.wg.Wait()
}


+ 2
- 2
p2p/switch.go View File

@ -90,7 +90,7 @@ func NewSwitch(reactors []Reactor) *Switch {
func (sw *Switch) Start() {
if atomic.CompareAndSwapUint32(&sw.started, 0, 1) {
log.Info("Starting switch")
log.Info("Starting Switch")
for _, reactor := range sw.reactors {
reactor.Start(sw)
}
@ -99,7 +99,7 @@ func (sw *Switch) Start() {
func (sw *Switch) Stop() {
if atomic.CompareAndSwapUint32(&sw.stopped, 0, 1) {
log.Info("Stopping switch")
log.Info("Stopping Switch")
close(sw.quit)
// Stop each peer.
for _, peer := range sw.peers.List() {


+ 131
- 0
rpc/handler.go View File

@ -0,0 +1,131 @@
package rpc
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"runtime/debug"
"strings"
"time"
"github.com/tendermint/tendermint/alert"
)
type APIStatus string
const (
API_OK APIStatus = "OK"
API_ERROR APIStatus = "ERROR"
API_INVALID_PARAM APIStatus = "INVALID_PARAM"
API_UNAUTHORIZED APIStatus = "UNAUTHORIZED"
API_REDIRECT APIStatus = "REDIRECT"
)
type APIResponse struct {
Status APIStatus `json:"status"`
Data interface{} `json:"data"`
}
func (res APIResponse) Error() string {
return fmt.Sprintf("Status(%v) %v", res.Status, res.Data)
}
// Throws a panic which the RecoverAndLogHandler catches.
func ReturnJSON(status APIStatus, data interface{}) {
res := APIResponse{}
res.Status = status
res.Data = data
panic(res)
}
// Wraps an HTTP handler, adding error logging.
//
// If the inner function panics, the outer function recovers, logs, sends an
// HTTP 500 error response.
func RecoverAndLogHandler(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Wrap the ResponseWriter to remember the status
rww := &ResponseWriterWrapper{-1, w}
begin := time.Now()
// Common headers
origin := r.Header.Get("Origin")
originUrl, err := url.Parse(origin)
if err == nil {
originHost := strings.Split(originUrl.Host, ":")[0]
if strings.HasSuffix(originHost, ".ftnox.com") {
rww.Header().Set("Access-Control-Allow-Origin", origin)
rww.Header().Set("Access-Control-Allow-Credentials", "true")
rww.Header().Set("Access-Control-Expose-Headers", "X-Server-Time")
}
}
rww.Header().Set("X-Server-Time", fmt.Sprintf("%v", begin.Unix()))
defer func() {
// Send a 500 error if a panic happens during a handler.
// Without this, Chrome & Firefox were retrying aborted ajax requests,
// at least to my localhost.
if e := recover(); e != nil {
// If APIResponse,
if res, ok := e.(APIResponse); ok {
resJSON, err := json.Marshal(res)
if err != nil {
panic(err)
}
rww.Header().Set("Content-Type", "application/json")
switch res.Status {
case API_OK:
rww.WriteHeader(200)
case API_ERROR:
rww.WriteHeader(400)
case API_UNAUTHORIZED:
rww.WriteHeader(401)
case API_INVALID_PARAM:
rww.WriteHeader(420)
case API_REDIRECT:
rww.WriteHeader(430)
default:
rww.WriteHeader(440)
}
rww.Write(resJSON)
} else {
// For the rest,
rww.WriteHeader(http.StatusInternalServerError)
rww.Write([]byte("Internal Server Error"))
log.Error("%s: %s", e, debug.Stack())
}
}
// Finally, log.
durationMS := time.Since(begin).Nanoseconds() / 1000000
if rww.Status == -1 {
rww.Status = 200
}
log.Debug("%s %s %v %v %s", r.RemoteAddr, r.Method, rww.Status, durationMS, r.URL)
}()
handler.ServeHTTP(rww, r)
})
}
// Remember the status for logging
type ResponseWriterWrapper struct {
Status int
http.ResponseWriter
}
func (w *ResponseWriterWrapper) WriteHeader(status int) {
w.Status = status
w.ResponseWriter.WriteHeader(status)
}
// Stick it as a deferred statement in gouroutines to prevent the program from crashing.
func Recover(daemonName string) {
if e := recover(); e != nil {
stack := string(debug.Stack())
errorString := fmt.Sprintf("[%s] %s\n%s", daemonName, e, stack)
alert.Alert(errorString)
}
}

+ 23
- 0
rpc/http_server.go View File

@ -0,0 +1,23 @@
package rpc
import (
"fmt"
"net/http"
. "github.com/tendermint/tendermint/config"
)
func StartHTTPServer() {
//http.HandleFunc("/path", handler)
//http.HandleFunc("/path", handler)
// Serve HTTP on localhost only.
// Let something like Nginx handle HTTPS connections.
address := fmt.Sprintf("127.0.0.1:%v", Config.RPC.HTTPPort)
log.Info("Starting RPC HTTP server on http://%s", address)
go func() {
log.Fatal(http.ListenAndServe(address, RecoverAndLogHandler(http.DefaultServeMux)))
}()
}

+ 15
- 0
rpc/log.go View File

@ -0,0 +1,15 @@
package rpc
import (
"github.com/op/go-logging"
)
var log = logging.MustGetLogger("rpc")
func init() {
logging.SetFormatter(logging.MustStringFormatter("[%{level:.1s}] %{message}"))
}
func SetRPCLogger(l *logging.Logger) {
log = l
}

+ 0
- 3
rpc/rpc.go View File

@ -1,3 +0,0 @@
package rpc
// TODO

Loading…
Cancel
Save