Browse Source

Added a registry to Barak.

pull/61/head
Jae Kwon 10 years ago
parent
commit
66ff985cd2
6 changed files with 56 additions and 8 deletions
  1. +37
    -2
      cmd/barak/main.go
  2. +3
    -0
      cmd/barak/types/responses.go
  3. +2
    -1
      node/node.go
  4. +10
    -1
      p2p/listener.go
  5. +0
    -1
      rpc/client.go
  6. +4
    -3
      rpc/http_server.go

+ 37
- 2
cmd/barak/main.go View File

@ -19,6 +19,7 @@ import (
"github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/cmd/barak/types" . "github.com/tendermint/tendermint/cmd/barak/types"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/p2p"
pcm "github.com/tendermint/tendermint/process" pcm "github.com/tendermint/tendermint/process"
"github.com/tendermint/tendermint/rpc" "github.com/tendermint/tendermint/rpc"
) )
@ -33,6 +34,7 @@ type Options struct {
Validators []Validator Validators []Validator
ListenAddress string ListenAddress string
StartNonce uint64 StartNonce uint64
Registries []string
} }
// Global instance // Global instance
@ -43,6 +45,7 @@ var barak = struct {
processes map[string]*pcm.Process processes map[string]*pcm.Process
validators []Validator validators []Validator
rootDir string rootDir string
registries []string
}{ }{
mtx: sync.Mutex{}, mtx: sync.Mutex{},
pid: os.Getpid(), pid: os.Getpid(),
@ -50,6 +53,7 @@ var barak = struct {
processes: make(map[string]*pcm.Process), processes: make(map[string]*pcm.Process),
validators: nil, validators: nil,
rootDir: "", rootDir: "",
registries: nil,
} }
func main() { func main() {
@ -95,11 +99,27 @@ func main() {
fmt.Printf("Barak: %v\n", barak) fmt.Printf("Barak: %v\n", barak)
// Start rpc server. // Start rpc server.
listener := p2p.NewDefaultListener("tcp", options.ListenAddress, false)
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/download", ServeFile) mux.HandleFunc("/download", ServeFile)
mux.HandleFunc("/register", Register)
// TODO: mux.HandleFunc("/upload", UploadFile) // TODO: mux.HandleFunc("/upload", UploadFile)
rpc.RegisterRPCFuncs(mux, Routes) rpc.RegisterRPCFuncs(mux, Routes)
rpc.StartHTTPServer(options.ListenAddress, mux)
rpc.StartHTTPServer(listener, mux)
// Register this barak with central listener
extAddress := listener.ExternalAddress().String()
for _, registry := range barak.registries {
go func(registry string) {
var response ResponseRegister
_, err = rpc.Call(registry, "register", Arr(extAddress), &response)
if err != nil {
fmt.Printf("Error registering to registry %v:\n %v\n", registry, err)
} else {
fmt.Printf("Successfully registered with registry %v\n", registry)
}
}(registry)
}
TrapSignal(func() { TrapSignal(func() {
fmt.Println("Barak shutting down") fmt.Println("Barak shutting down")
@ -255,8 +275,23 @@ func ListProcesses() (*ResponseListProcesses, error) {
}, nil }, nil
} }
func ServeFile(w http.ResponseWriter, req *http.Request) {
// Another barak instance registering its external
// address to a remote barak.
func Register(w http.ResponseWriter, req *http.Request) {
registry, err := os.OpenFile(barak.rootDir+"/registry.log", os.O_RDWR|os.O_APPEND, 0x600)
if err != nil {
http.Error(w, "Could not open registry file. Please contact the administrator", 500)
return
}
// TODO: Also check the X-FORWARDED-FOR or whatever it's called.
registry.Write([]byte(Fmt("++ %v\n", req.RemoteAddr)))
registry.Close()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
w.Write([]byte("Noted!"))
}
func ServeFile(w http.ResponseWriter, req *http.Request) {
authCommandStr := req.FormValue("auth_command") authCommandStr := req.FormValue("auth_command")
command, err := parseValidateCommandStr(authCommandStr) command, err := parseValidateCommandStr(authCommandStr)
if err != nil { if err != nil {


+ 3
- 0
cmd/barak/types/responses.go View File

@ -10,6 +10,9 @@ type ResponseStatus struct {
Validators []Validator Validators []Validator
} }
type ResponseRegister struct {
}
type ResponseRunProcess struct { type ResponseRunProcess struct {
Success bool Success bool
Output string Output string


+ 2
- 1
node/node.go View File

@ -162,10 +162,11 @@ func (n *Node) StartRPC() {
core.SetSwitch(n.sw) core.SetSwitch(n.sw)
listenAddr := config.App().GetString("RPC.HTTP.ListenAddr") listenAddr := config.App().GetString("RPC.HTTP.ListenAddr")
listener := p2p.NewDefaultListener("tcp", listenAddr, false)
mux := http.NewServeMux() mux := http.NewServeMux()
rpc.RegisterEventsHandler(mux, n.evsw) rpc.RegisterEventsHandler(mux, n.evsw)
rpc.RegisterRPCFuncs(mux, core.Routes) rpc.RegisterRPCFuncs(mux, core.Routes)
rpc.StartHTTPServer(listenAddr, mux)
rpc.StartHTTPServer(listener, mux)
} }
func (n *Node) Switch() *p2p.Switch { func (n *Node) Switch() *p2p.Switch {


+ 10
- 1
p2p/listener.go View File

@ -12,6 +12,7 @@ import (
type Listener interface { type Listener interface {
Connections() <-chan net.Conn Connections() <-chan net.Conn
InternalAddress() *NetAddress
ExternalAddress() *NetAddress ExternalAddress() *NetAddress
String() string String() string
Stop() Stop()
@ -20,6 +21,7 @@ type Listener interface {
// Implements Listener // Implements Listener
type DefaultListener struct { type DefaultListener struct {
listener net.Listener listener net.Listener
intAddr *NetAddress
extAddr *NetAddress extAddr *NetAddress
connections chan net.Conn connections chan net.Conn
stopped uint32 stopped uint32
@ -55,9 +57,11 @@ func NewDefaultListener(protocol string, lAddr string, requireUPNPHairpin bool)
listenerIP, listenerPort := splitHostPort(listener.Addr().String()) listenerIP, listenerPort := splitHostPort(listener.Addr().String())
log.Debug("Local listener", "ip", listenerIP, "port", listenerPort) log.Debug("Local listener", "ip", listenerIP, "port", listenerPort)
// Determine internal address...
var intAddr *NetAddress = NewNetAddressString(lAddr)
// Determine external address... // Determine external address...
var extAddr *NetAddress var extAddr *NetAddress
// If the lAddrIP is INADDR_ANY, try UPnP // If the lAddrIP is INADDR_ANY, try UPnP
if lAddrIP == "" || lAddrIP == "0.0.0.0" { if lAddrIP == "" || lAddrIP == "0.0.0.0" {
if requireUPNPHairpin { if requireUPNPHairpin {
@ -84,6 +88,7 @@ SKIP_UPNP:
dl := &DefaultListener{ dl := &DefaultListener{
listener: listener, listener: listener,
intAddr: intAddr,
extAddr: extAddr, extAddr: extAddr,
connections: make(chan net.Conn, numBufferedConnections), connections: make(chan net.Conn, numBufferedConnections),
} }
@ -124,6 +129,10 @@ func (l *DefaultListener) Connections() <-chan net.Conn {
return l.connections return l.connections
} }
func (l *DefaultListener) InternalAddress() *NetAddress {
return l.intAddr
}
func (l *DefaultListener) ExternalAddress() *NetAddress { func (l *DefaultListener) ExternalAddress() *NetAddress {
return l.extAddr return l.extAddr
} }


+ 0
- 1
rpc/client.go View File

@ -31,7 +31,6 @@ func Call(remote string, method string, params []interface{}, dest interface{})
if err != nil { if err != nil {
return dest, err return dest, err
} }
log.Debug(Fmt("RPC response: %v", string(responseBytes))) log.Debug(Fmt("RPC response: %v", string(responseBytes)))
// Parse response into JSONResponse // Parse response into JSONResponse


+ 4
- 3
rpc/http_server.go View File

@ -16,10 +16,11 @@ import (
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
) )
func StartHTTPServer(listenAddr string, handler http.Handler) {
log.Info(Fmt("Starting RPC HTTP server on %s", listenAddr))
func StartHTTPServer(listener p2p.Listener, handler http.Handler) {
log.Info(Fmt("Starting RPC HTTP server on ext:%v int:%v",
listener.ExternalAddress(),
listener.InternalAddress()))
go func() { go func() {
listener := p2p.NewDefaultListener("tcp", listenAddr, false)
netListener := listener.(*p2p.DefaultListener).NetListener() netListener := listener.(*p2p.DefaultListener).NetListener()
res := http.Serve( res := http.Serve(
netListener, netListener,


Loading…
Cancel
Save