Browse Source

Added new Flush request/response type

pull/1780/head
Jae Kwon 9 years ago
parent
commit
ef93c95853
2 changed files with 42 additions and 17 deletions
  1. +16
    -3
      server/server.go
  2. +26
    -14
      types/messages.go

+ 16
- 3
server/server.go View File

@ -1,6 +1,7 @@
package server package server
import ( import (
"bufio"
"fmt" "fmt"
"net" "net"
"reflect" "reflect"
@ -51,11 +52,12 @@ func StartListener(protoAddr string, app types.Application) (net.Listener, error
// Read requests from conn and deal with them // Read requests from conn and deal with them
func handleRequests(app types.Application, connClosed chan struct{}, conn net.Conn, responses chan<- types.Response) { func handleRequests(app types.Application, connClosed chan struct{}, conn net.Conn, responses chan<- types.Response) {
var count int var count int
var bufReader = bufio.NewReader(conn)
for { for {
var n int64 var n int64
var err error var err error
var req types.Request var req types.Request
wire.ReadBinaryPtr(&req, conn, &n, &err)
wire.ReadBinaryPtr(&req, bufReader, &n, &err)
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
@ -77,6 +79,8 @@ func handleRequest(app types.Application, req types.Request, responses chan<- ty
case types.RequestEcho: case types.RequestEcho:
retCode, msg := app.Echo(req.Message) retCode, msg := app.Echo(req.Message)
responses <- types.ResponseEcho{retCode, msg} responses <- types.ResponseEcho{retCode, msg}
case types.RequestFlush:
responses <- types.ResponseFlush{}
case types.RequestAppendTx: case types.RequestAppendTx:
retCode := app.AppendTx(req.TxBytes) retCode := app.AppendTx(req.TxBytes)
responses <- types.ResponseAppendTx{retCode} responses <- types.ResponseAppendTx{retCode}
@ -116,18 +120,27 @@ func handleRequest(app types.Application, req types.Request, responses chan<- ty
// Pull responses from 'responses' and write them to conn. // Pull responses from 'responses' and write them to conn.
func handleResponses(connClosed chan struct{}, responses <-chan types.Response, conn net.Conn) { func handleResponses(connClosed chan struct{}, responses <-chan types.Response, conn net.Conn) {
var count int var count int
var bufWriter = bufio.NewWriter(conn)
for { for {
var res = <-responses var res = <-responses
var n int64 var n int64
var err error var err error
wire.WriteBinary(res, conn, &n, &err)
wire.WriteBinary(res, bufWriter, &n, &err)
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
connClosed <- struct{}{} connClosed <- struct{}{}
return return
} }
if _, ok := res.(types.ResponseFlush); ok {
err = bufWriter.Flush()
if err != nil {
fmt.Println(err.Error())
connClosed <- struct{}{}
return
}
}
count++ count++
if count%1000 == 0 { if count%1000 == 0 {
fmt.Println("Sent response", reflect.TypeOf(res), res, n, err, count) fmt.Println("Sent response", reflect.TypeOf(res), res, n, err, count)


+ 26
- 14
types/messages.go View File

@ -4,22 +4,24 @@ import "github.com/tendermint/go-wire"
const ( const (
requestTypeEcho = byte(0x01) requestTypeEcho = byte(0x01)
requestTypeAppendTx = byte(0x02)
requestTypeGetHash = byte(0x03)
requestTypeCommit = byte(0x04)
requestTypeRollback = byte(0x05)
requestTypeSetEventsMode = byte(0x06)
requestTypeAddListener = byte(0x07)
requestTypeRemListener = byte(0x08)
requestTypeFlush = byte(0x02)
requestTypeAppendTx = byte(0x03)
requestTypeGetHash = byte(0x04)
requestTypeCommit = byte(0x05)
requestTypeRollback = byte(0x06)
requestTypeSetEventsMode = byte(0x07)
requestTypeAddListener = byte(0x08)
requestTypeRemListener = byte(0x09)
responseTypeEcho = byte(0x11) responseTypeEcho = byte(0x11)
responseTypeAppendTx = byte(0x12)
responseTypeGetHash = byte(0x13)
responseTypeCommit = byte(0x14)
responseTypeRollback = byte(0x15)
responseTypeSetEventsMode = byte(0x16)
responseTypeAddListener = byte(0x17)
responseTypeRemListener = byte(0x18)
responseTypeFlush = byte(0x12)
responseTypeAppendTx = byte(0x13)
responseTypeGetHash = byte(0x14)
responseTypeCommit = byte(0x15)
responseTypeRollback = byte(0x16)
responseTypeSetEventsMode = byte(0x17)
responseTypeAddListener = byte(0x18)
responseTypeRemListener = byte(0x19)
responseTypeException = byte(0x20) responseTypeException = byte(0x20)
responseTypeEvent = byte(0x21) responseTypeEvent = byte(0x21)
@ -31,6 +33,9 @@ type RequestEcho struct {
Message string Message string
} }
type RequestFlush struct {
}
type RequestAppendTx struct { type RequestAppendTx struct {
TxBytes []byte TxBytes []byte
} }
@ -61,6 +66,7 @@ type Request interface {
} }
func (_ RequestEcho) AssertRequestType() {} func (_ RequestEcho) AssertRequestType() {}
func (_ RequestFlush) AssertRequestType() {}
func (_ RequestAppendTx) AssertRequestType() {} func (_ RequestAppendTx) AssertRequestType() {}
func (_ RequestGetHash) AssertRequestType() {} func (_ RequestGetHash) AssertRequestType() {}
func (_ RequestCommit) AssertRequestType() {} func (_ RequestCommit) AssertRequestType() {}
@ -72,6 +78,7 @@ func (_ RequestRemListener) AssertRequestType() {}
var _ = wire.RegisterInterface( var _ = wire.RegisterInterface(
struct{ Request }{}, struct{ Request }{},
wire.ConcreteType{RequestEcho{}, requestTypeEcho}, wire.ConcreteType{RequestEcho{}, requestTypeEcho},
wire.ConcreteType{RequestFlush{}, requestTypeFlush},
wire.ConcreteType{RequestAppendTx{}, requestTypeAppendTx}, wire.ConcreteType{RequestAppendTx{}, requestTypeAppendTx},
wire.ConcreteType{RequestGetHash{}, requestTypeGetHash}, wire.ConcreteType{RequestGetHash{}, requestTypeGetHash},
wire.ConcreteType{RequestCommit{}, requestTypeCommit}, wire.ConcreteType{RequestCommit{}, requestTypeCommit},
@ -88,6 +95,9 @@ type ResponseEcho struct {
Message string Message string
} }
type ResponseFlush struct {
}
type ResponseAppendTx struct { type ResponseAppendTx struct {
RetCode RetCode
} }
@ -130,6 +140,7 @@ type Response interface {
} }
func (_ ResponseEcho) AssertResponseType() {} func (_ ResponseEcho) AssertResponseType() {}
func (_ ResponseFlush) AssertResponseType() {}
func (_ ResponseAppendTx) AssertResponseType() {} func (_ ResponseAppendTx) AssertResponseType() {}
func (_ ResponseGetHash) AssertResponseType() {} func (_ ResponseGetHash) AssertResponseType() {}
func (_ ResponseCommit) AssertResponseType() {} func (_ ResponseCommit) AssertResponseType() {}
@ -143,6 +154,7 @@ func (_ ResponseEvent) AssertResponseType() {}
var _ = wire.RegisterInterface( var _ = wire.RegisterInterface(
struct{ Response }{}, struct{ Response }{},
wire.ConcreteType{ResponseEcho{}, responseTypeEcho}, wire.ConcreteType{ResponseEcho{}, responseTypeEcho},
wire.ConcreteType{ResponseFlush{}, responseTypeFlush},
wire.ConcreteType{ResponseAppendTx{}, responseTypeAppendTx}, wire.ConcreteType{ResponseAppendTx{}, responseTypeAppendTx},
wire.ConcreteType{ResponseGetHash{}, responseTypeGetHash}, wire.ConcreteType{ResponseGetHash{}, responseTypeGetHash},
wire.ConcreteType{ResponseCommit{}, responseTypeCommit}, wire.ConcreteType{ResponseCommit{}, responseTypeCommit},


Loading…
Cancel
Save