Browse Source

time encoding in binary/reflect

pull/9/head
Jae Kwon 10 years ago
parent
commit
325b88b083
11 changed files with 233 additions and 116 deletions
  1. +102
    -52
      binary/reflect.go
  2. +6
    -0
      binary/reflect_test.go
  3. +4
    -1
      cmd/daemon.go
  4. +17
    -4
      config/config.go
  5. +1
    -1
      consensus/state.go
  6. +29
    -5
      rpc/blocks.go
  7. +43
    -37
      rpc/http_handler.go
  8. +9
    -0
      rpc/http_params.go
  9. +4
    -8
      rpc/http_server.go
  10. +8
    -4
      rpc/mempool.go
  11. +10
    -4
      rpc/rpc.go

+ 102
- 52
binary/reflect.go View File

@ -7,6 +7,7 @@ import (
"io"
"reflect"
"sync"
"time"
. "github.com/tendermint/tendermint/common"
)
@ -15,8 +16,9 @@ type TypeInfo struct {
Type reflect.Type // The type
// Custom encoder/decoder
Encoder Encoder
Decoder Decoder
// NOTE: Not used.
BinaryEncoder Encoder
BinaryDecoder Decoder
// If Type is kind reflect.Interface
ConcreteTypes map[byte]reflect.Type
@ -44,6 +46,15 @@ func GetTypeByteFromStruct(o interface{}) (hasTypeByte bool, typeByte byte) {
}
}
// Predeclaration of common types
var (
timeType = GetTypeFromStructDeclaration(struct{ time.Time }{})
)
const (
rfc2822 = "Mon Jan 02 15:04:05 -0700 2006"
)
// If a type implements TypeByte, the byte is included
// as the first byte for encoding and decoding.
// This is primarily used to encode interfaces types.
@ -155,8 +166,8 @@ func readReflect(rv reflect.Value, rt reflect.Type, r Unreader, n *int64, err *e
typeInfo := GetTypeInfo(rt)
// Custom decoder
if typeInfo.Decoder != nil {
decoded := typeInfo.Decoder(r, n, err)
if typeInfo.BinaryDecoder != nil {
decoded := typeInfo.BinaryDecoder(r, n, err)
rv.Set(reflect.ValueOf(decoded))
return
}
@ -219,14 +230,21 @@ func readReflect(rv reflect.Value, rt reflect.Type, r Unreader, n *int64, err *e
}
case reflect.Struct:
numFields := rt.NumField()
for i := 0; i < numFields; i++ {
field := rt.Field(i)
if field.PkgPath != "" {
continue
if rt == timeType {
// Special case: time.Time
num := ReadInt64(r, n, err)
log.Debug(Fmt("Read time: %v", num))
rv.Set(reflect.ValueOf(time.Unix(num, 0)))
} else {
numFields := rt.NumField()
for i := 0; i < numFields; i++ {
field := rt.Field(i)
if field.PkgPath != "" {
continue
}
fieldRv := rv.Field(i)
readReflect(fieldRv, field.Type, r, n, err)
}
fieldRv := rv.Field(i)
readReflect(fieldRv, field.Type, r, n, err)
}
case reflect.String:
@ -295,8 +313,8 @@ func writeReflect(rv reflect.Value, rt reflect.Type, w io.Writer, n *int64, err
typeInfo := GetTypeInfo(rt)
// Custom encoder, say for an interface type rt.
if typeInfo.Encoder != nil {
typeInfo.Encoder(rv.Interface(), w, n, err)
if typeInfo.BinaryEncoder != nil {
typeInfo.BinaryEncoder(rv.Interface(), w, n, err)
return
}
@ -338,14 +356,19 @@ func writeReflect(rv reflect.Value, rt reflect.Type, w io.Writer, n *int64, err
}
case reflect.Struct:
numFields := rt.NumField()
for i := 0; i < numFields; i++ {
field := rt.Field(i)
if field.PkgPath != "" {
continue
if rt == timeType {
// Special case: time.Time
WriteInt64(rv.Interface().(time.Time).Unix(), w, n, err)
} else {
numFields := rt.NumField()
for i := 0; i < numFields; i++ {
field := rt.Field(i)
if field.PkgPath != "" {
continue
}
fieldRv := rv.Field(i)
writeReflect(fieldRv, field.Type, w, n, err)
}
fieldRv := rv.Field(i)
writeReflect(fieldRv, field.Type, w, n, err)
}
case reflect.String:
@ -488,25 +511,41 @@ func readReflectJSON(rv reflect.Value, rt reflect.Type, o interface{}, err *erro
}
case reflect.Struct:
oMap, ok := o.(map[string]interface{})
if !ok {
*err = errors.New(Fmt("Expected map but got type %v", reflect.TypeOf(o)))
return
}
// TODO: ensure that all fields are set?
for name, value := range oMap {
field, ok := rt.FieldByName(name)
if rt == timeType {
// Special case: time.Time
str, ok := o.(string)
if !ok {
*err = errors.New(Fmt("Attempt to set unknown field %v", field.Name))
*err = errors.New(Fmt("Expected string but got type %v", reflect.TypeOf(o)))
return
}
log.Debug(Fmt("Read time: %v", str))
t, err_ := time.Parse(rfc2822, str)
if err_ != nil {
*err = err_
return
}
// JAE: I don't think golang reflect lets us set unexported fields, but just in case:
if field.PkgPath != "" {
*err = errors.New(Fmt("Attempt to set unexported field %v", field.Name))
rv.Set(reflect.ValueOf(t))
} else {
oMap, ok := o.(map[string]interface{})
if !ok {
*err = errors.New(Fmt("Expected map but got type %v", reflect.TypeOf(o)))
return
}
fieldRv := rv.FieldByName(name)
readReflectJSON(fieldRv, field.Type, value, err)
// TODO: ensure that all fields are set?
for name, value := range oMap {
field, ok := rt.FieldByName(name)
if !ok {
*err = errors.New(Fmt("Attempt to set unknown field %v", field.Name))
return
}
// JAE: I don't think golang reflect lets us set unexported fields, but just in case:
if field.PkgPath != "" {
*err = errors.New(Fmt("Attempt to set unexported field %v", field.Name))
return
}
fieldRv := rv.FieldByName(name)
readReflectJSON(fieldRv, field.Type, value, err)
}
}
case reflect.String:
@ -550,12 +589,6 @@ func writeReflectJSON(rv reflect.Value, rt reflect.Type, w io.Writer, n *int64,
// Get typeInfo
typeInfo := GetTypeInfo(rt)
// Custom encoder, say for an interface type rt.
if typeInfo.Encoder != nil {
typeInfo.Encoder(rv.Interface(), w, n, err)
return
}
// Dereference interface
if rt.Kind() == reflect.Interface {
rv = rv.Elem()
@ -598,21 +631,36 @@ func writeReflectJSON(rv reflect.Value, rt reflect.Type, w io.Writer, n *int64,
}
case reflect.Struct:
WriteTo([]byte("{"), w, n, err)
numFields := rt.NumField()
for i := 0; i < numFields; i++ {
field := rt.Field(i)
if field.PkgPath != "" {
continue
if rt == timeType {
// Special case: time.Time
t := rv.Interface().(time.Time)
str := t.Format(rfc2822)
jsonBytes, err_ := json.Marshal(str)
if err_ != nil {
*err = err_
return
}
fieldRv := rv.Field(i)
WriteTo([]byte(Fmt("\"%v\":", field.Name)), w, n, err)
writeReflectJSON(fieldRv, field.Type, w, n, err)
if i < numFields-1 {
WriteTo([]byte(","), w, n, err)
WriteTo(jsonBytes, w, n, err)
} else {
WriteTo([]byte("{"), w, n, err)
numFields := rt.NumField()
wroteField := false
for i := 0; i < numFields; i++ {
field := rt.Field(i)
if field.PkgPath != "" {
continue
}
fieldRv := rv.Field(i)
if wroteField {
WriteTo([]byte(","), w, n, err)
} else {
wroteField = true
}
WriteTo([]byte(Fmt("\"%v\":", field.Name)), w, n, err)
writeReflectJSON(fieldRv, field.Type, w, n, err)
}
WriteTo([]byte("}"), w, n, err)
}
WriteTo([]byte("}"), w, n, err)
case reflect.String:
fallthrough
@ -635,3 +683,5 @@ func writeReflectJSON(rv reflect.Value, rt reflect.Type, w io.Writer, n *int64,
WriteTo([]byte("]"), w, n, err)
}
}
//-----------------------------------------------------------------------------

+ 6
- 0
binary/reflect_test.go View File

@ -4,11 +4,13 @@ import (
"bytes"
"reflect"
"testing"
"time"
)
type SimpleStruct struct {
String string
Bytes []byte
Time time.Time
}
//-------------------------------------
@ -75,6 +77,7 @@ func constructBasic() interface{} {
SimpleStruct{
String: "String",
Bytes: []byte("Bytes"),
Time: time.Unix(123, 0),
},
}
return cat
@ -92,6 +95,9 @@ func validateBasic(o interface{}, t *testing.T) {
if string(cat.Bytes) != "Bytes" {
t.Errorf("Expected cat2.Bytes == 'Bytes', got %X", cat.Bytes)
}
if cat.Time.Unix() != 123 {
t.Errorf("Expected cat2.Time == 'Unix(123)', got %v", cat.Time)
}
}
//-------------------------------------


+ 4
- 1
cmd/daemon.go View File

@ -20,6 +20,7 @@ type Node struct {
sw *p2p.Switch
book *p2p.AddrBook
pexReactor *p2p.PEXReactor
blockStore *block.BlockStore
mempoolReactor *mempool_.MempoolReactor
consensusReactor *consensus.ConsensusReactor
state *state_.State
@ -65,6 +66,7 @@ func NewNode() *Node {
sw: sw,
book: book,
pexReactor: pexReactor,
blockStore: blockStore,
mempoolReactor: mempoolReactor,
consensusReactor: consensusReactor,
state: state,
@ -137,7 +139,8 @@ func daemon() {
}
// Run the RPC server.
if config.Config.RPC.HTTPPort != 0 {
if config.Config.RPC.HTTPLAddr != "" {
rpc.SetRPCBlockStore(n.blockStore)
rpc.SetRPCState(n.state)
rpc.SetRPCMempoolReactor(n.mempoolReactor)
rpc.StartHTTPServer()


+ 17
- 4
config/config.go View File

@ -6,6 +6,7 @@ import (
"flag"
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
"strings"
@ -50,7 +51,7 @@ type SMTPConfig struct {
}
type RPCConfig struct {
HTTPPort uint
HTTPLAddr string
}
func (cfg *ConfigType) validate() error {
@ -66,6 +67,17 @@ func (cfg *ConfigType) validate() error {
if cfg.DB.Backend == "" {
return errors.New("DB.Backend must be set")
}
if cfg.RPC.HTTPLAddr == "" {
fmt.Println("Set RPC.HTTPLAddr to \"0.0.0.0:8888\" in your config.json to enable the RPC API server.")
} else {
_, port, err := net.SplitHostPort(cfg.RPC.HTTPLAddr)
if err != nil {
return errors.New(Fmt("RPC.HTTPLAddr is invalid. %v", err))
}
if port == "" || port == "0" {
return errors.New("RPC.HTTPLAddr is invalid. Port number must be defined")
}
}
return nil
}
@ -114,7 +126,7 @@ func init() {
Alert: AlertConfig{},
SMTP: SMTPConfig{},
RPC: RPCConfig{
HTTPPort: 8888,
HTTPLAddr: "0.0.0.0:0",
},
}
}
@ -132,6 +144,7 @@ func parseFlags(flags *flag.FlagSet, args []string) (printHelp bool) {
flags.BoolVar(&printHelp, "help", false, "Print this help message.")
flags.StringVar(&Config.LAddr, "laddr", Config.LAddr, "Listen address. (0.0.0.0:0 means any interface, any port)")
flags.StringVar(&Config.SeedNode, "seed", Config.SeedNode, "Address of seed node")
flags.StringVar(&Config.RPC.HTTPLAddr, "rpc_http_laddr", Config.RPC.HTTPLAddr, "RPC listen address. (0.0.0.0:0 means any interface, any port)")
flags.Parse(args)
return
}
@ -152,11 +165,11 @@ func ParseFlags(args []string) {
Config = ConfigType{}
err = json.Unmarshal(configBytes, &Config)
if err != nil {
Exit(Fmt("Invalid configuration file %s: %v", configFile, err))
Exit(Fmt("Invalid configuration file %s:\n%v\n", configFile, err))
}
err = Config.validate()
if err != nil {
Exit(Fmt("Invalid configuration file %s: %v", configFile, err))
Exit(Fmt("Invalid configuration file %s:\n%v\n", configFile, err))
}
// try to parse arg flags, which can override file configuration.


+ 1
- 1
consensus/state.go View File

@ -820,7 +820,7 @@ func (cs *ConsensusState) TryFinalizeCommit(height uint) bool {
}
hash, header, _ := cs.Commits.TwoThirdsMajority()
if !cs.ProposalBlock.HashesTo(hash) {
panic(Fmt("Expected ProposalBlock to hash to commit hash"))
panic(Fmt("Expected ProposalBlock to hash to commit hash. Expected %X, got %X", hash, cs.ProposalBlock.Hash()))
}
if !cs.ProposalBlockParts.HasHeader(header) {
panic(Fmt("Expected ProposalBlockParts header to be commit header"))


+ 29
- 5
rpc/blocks.go View File

@ -2,12 +2,36 @@ package rpc
import (
"net/http"
//. "github.com/tendermint/tendermint/block"
. "github.com/tendermint/tendermint/block"
. "github.com/tendermint/tendermint/common"
)
func BlockHandler(w http.ResponseWriter, r *http.Request) {
//height, _ := GetParamUint64Safe(r, "height")
//count, _ := GetParamUint64Safe(r, "count")
type BlockchainInfoResponse struct {
LastHeight uint
BlockMetas []*BlockMeta
}
func BlockchainInfoHandler(w http.ResponseWriter, r *http.Request) {
minHeight, _ := GetParamUint(r, "min_height")
maxHeight, _ := GetParamUint(r, "max_height")
if maxHeight == 0 {
maxHeight = blockStore.Height()
}
if minHeight == 0 {
minHeight = MaxUint(0, maxHeight-20)
}
blockMetas := []*BlockMeta{}
for height := minHeight; height <= maxHeight; height++ {
blockMetas = append(blockMetas, blockStore.LoadBlockMeta(height))
}
res := BlockchainInfoResponse{
LastHeight: blockStore.Height(),
BlockMetas: blockMetas,
}
ReturnJSON(API_OK, "hello")
WriteAPIResponse(w, API_OK, res)
return
}

+ 43
- 37
rpc/http_handler.go View File

@ -2,16 +2,14 @@
package rpc
import (
"encoding/json"
"bytes"
"fmt"
"net/http"
"net/url"
"runtime/debug"
"strings"
"time"
"github.com/tendermint/tendermint/alert"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/binary"
)
type APIStatus string
@ -33,12 +31,33 @@ func (res APIResponse) Error() string {
return fmt.Sprintf("Status(%v) %v", res.Status, res.Data)
}
// Throws a panic which the RecoverAndLogHandler catches.
func ReturnJSON(status APIStatus, data interface{}) {
func WriteAPIResponse(w http.ResponseWriter, status APIStatus, data interface{}) {
res := APIResponse{}
res.Status = status
res.Data = data
panic(res)
buf, n, err := new(bytes.Buffer), new(int64), new(error)
binary.WriteJSON(res, w, n, err)
if *err != nil {
log.Warn("Failed to write JSON APIResponse", "error", err)
}
w.Header().Set("Content-Type", "application/json")
switch res.Status {
case API_OK:
w.WriteHeader(200)
case API_ERROR:
w.WriteHeader(400)
case API_UNAUTHORIZED:
w.WriteHeader(401)
case API_INVALID_PARAM:
w.WriteHeader(420)
case API_REDIRECT:
w.WriteHeader(430)
default:
w.WriteHeader(440)
}
w.Write(buf.Bytes())
}
// Wraps an HTTP handler, adding error logging.
@ -52,16 +71,18 @@ func RecoverAndLogHandler(handler http.Handler) http.Handler {
begin := time.Now()
// Common headers
origin := r.Header.Get("Origin")
originUrl, err := url.Parse(origin)
if err == nil {
originHost := strings.Split(originUrl.Host, ":")[0]
if strings.HasSuffix(originHost, ".ftnox.com") {
rww.Header().Set("Access-Control-Allow-Origin", origin)
rww.Header().Set("Access-Control-Allow-Credentials", "true")
rww.Header().Set("Access-Control-Expose-Headers", "X-Server-Time")
/*
origin := r.Header.Get("Origin")
originUrl, err := url.Parse(origin)
if err == nil {
originHost := strings.Split(originUrl.Host, ":")[0]
if strings.HasSuffix(originHost, ".tendermint.com") {
rww.Header().Set("Access-Control-Allow-Origin", origin)
rww.Header().Set("Access-Control-Allow-Credentials", "true")
rww.Header().Set("Access-Control-Expose-Headers", "X-Server-Time")
}
}
}
*/
rww.Header().Set("X-Server-Time", fmt.Sprintf("%v", begin.Unix()))
defer func() {
@ -72,26 +93,7 @@ func RecoverAndLogHandler(handler http.Handler) http.Handler {
// If APIResponse,
if res, ok := e.(APIResponse); ok {
resJSON, err := json.Marshal(res)
if err != nil {
panic(err)
}
rww.Header().Set("Content-Type", "application/json")
switch res.Status {
case API_OK:
rww.WriteHeader(200)
case API_ERROR:
rww.WriteHeader(400)
case API_UNAUTHORIZED:
rww.WriteHeader(401)
case API_INVALID_PARAM:
rww.WriteHeader(420)
case API_REDIRECT:
rww.WriteHeader(430)
default:
rww.WriteHeader(440)
}
rww.Write(resJSON)
WriteAPIResponse(rww, res.Status, res.Data)
} else {
// For the rest,
rww.WriteHeader(http.StatusInternalServerError)
@ -105,7 +107,11 @@ func RecoverAndLogHandler(handler http.Handler) http.Handler {
if rww.Status == -1 {
rww.Status = 200
}
log.Debug(Fmt("%s %s %v %v %s", r.RemoteAddr, r.Method, rww.Status, durationMS, r.URL))
log.Debug("Served HTTP response",
"method", r.Method, "url", r.URL,
"status", rww.Status, "duration", durationMS,
"remoteAddr", r.RemoteAddr,
)
}()
handler.ServeHTTP(rww, r)


+ 9
- 0
rpc/http_params.go View File

@ -67,6 +67,15 @@ func GetParamUint64(r *http.Request, param string) (uint64, error) {
return i, nil
}
func GetParamUint(r *http.Request, param string) (uint, error) {
s := GetParam(r, param)
i, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return 0, Errorf(param, err.Error())
}
return uint(i), nil
}
func GetParamRegexp(r *http.Request, param string, re *regexp.Regexp) (string, error) {
s := GetParam(r, param)
if !re.MatchString(s) {


+ 4
- 8
rpc/http_server.go View File

@ -1,24 +1,20 @@
package rpc
import (
"fmt"
"net/http"
. "github.com/tendermint/tendermint/config"
. "github.com/tendermint/tendermint/common"
. "github.com/tendermint/tendermint/config"
)
func StartHTTPServer() {
http.HandleFunc("/block", BlockHandler)
http.HandleFunc("/block", BlockchainInfoHandler)
http.HandleFunc("/mempool", MempoolHandler)
// Serve HTTP on localhost only.
// Let something like Nginx handle HTTPS connections.
address := fmt.Sprintf("127.0.0.1:%v", Config.RPC.HTTPPort)
log.Info(Fmt("Starting RPC HTTP server on http://%s", address))
log.Info(Fmt("Starting RPC HTTP server on %s", Config.RPC.HTTPLAddr))
go func() {
log.Crit("%v", http.ListenAndServe(address, RecoverAndLogHandler(http.DefaultServeMux)))
log.Crit("RPC HTTPServer stopped", "result", http.ListenAndServe(Config.RPC.HTTPLAddr, RecoverAndLogHandler(http.DefaultServeMux)))
}()
}

+ 8
- 4
rpc/mempool.go View File

@ -15,25 +15,29 @@ func MempoolHandler(w http.ResponseWriter, r *http.Request) {
//count, _ := GetParamUint64Safe(r, "count")
txBytes, err := GetParamByteSlice(r, "tx_bytes")
if err != nil {
ReturnJSON(API_INVALID_PARAM, Fmt("Invalid tx_bytes: %v", err))
WriteAPIResponse(w, API_INVALID_PARAM, Fmt("Invalid tx_bytes: %v", err))
return
}
reader, n := bytes.NewReader(txBytes), new(int64)
tx_ := ReadBinary(struct{ Tx }{}, reader, n, &err).(struct{ Tx })
if err != nil {
ReturnJSON(API_INVALID_PARAM, Fmt("Invalid tx_bytes: %v", err))
WriteAPIResponse(w, API_INVALID_PARAM, Fmt("Invalid tx_bytes: %v", err))
return
}
tx := tx_.Tx
err = mempoolReactor.BroadcastTx(tx)
if err != nil {
ReturnJSON(API_ERROR, Fmt("Error broadcasting transaction: %v", err))
WriteAPIResponse(w, API_ERROR, Fmt("Error broadcasting transaction: %v", err))
return
}
jsonBytes := JSONBytes(tx)
fmt.Println(">>", string(jsonBytes))
ReturnJSON(API_OK, Fmt("Broadcasted tx: %X", tx))
WriteAPIResponse(w, API_OK, Fmt("Broadcasted tx: %X", tx))
return
}
/*


+ 10
- 4
rpc/rpc.go View File

@ -1,17 +1,23 @@
package rpc
import (
block_ "github.com/tendermint/tendermint/block"
mempool_ "github.com/tendermint/tendermint/mempool"
state_ "github.com/tendermint/tendermint/state"
)
var blockStore *block_.BlockStore
var state *state_.State
var mempoolReactor *mempool_.MempoolReactor
func SetRPCState(state__ *state_.State) {
state = state__
func SetRPCBlockStore(bs *block_.BlockStore) {
blockStore = bs
}
func SetRPCMempoolReactor(mempoolReactor_ *mempool_.MempoolReactor) {
mempoolReactor = mempoolReactor_
func SetRPCState(s *state_.State) {
state = s
}
func SetRPCMempoolReactor(mr *mempool_.MempoolReactor) {
mempoolReactor = mr
}

Loading…
Cancel
Save