Browse Source

Refactoring barak

pull/73/merge
Jae Kwon 9 years ago
parent
commit
c297f41a9f
7 changed files with 342 additions and 149 deletions
  1. +247
    -0
      cmd/barak/barak.go
  2. +61
    -135
      cmd/barak/main.go
  3. +15
    -3
      cmd/barak/types/command.go
  4. +8
    -1
      cmd/barak/types/responses.go
  5. +1
    -1
      cmd/debora/commands.go
  6. +4
    -4
      cmd/debora/main.go
  7. +6
    -5
      rpc/server/http_server.go

+ 247
- 0
cmd/barak/barak.go View File

@ -0,0 +1,247 @@
package main
import (
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"sync"
"time"
"github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/cmd/barak/types"
. "github.com/tendermint/tendermint/common"
pcm "github.com/tendermint/tendermint/process"
"github.com/tendermint/tendermint/rpc/server"
)
type BarakOptions struct {
Validators []Validator
ListenAddress string
StartNonce uint64
Registries []string
}
// Read options from a file, or stdin if optionsFile is ""
func ReadBarakOptions(optFile string) *BarakOptions {
var optBytes []byte
var err error
if optFile != "" {
optBytes, err = ioutil.ReadFile(optFile)
} else {
optBytes, err = ioutil.ReadAll(os.Stdin)
}
if err != nil {
panic(Fmt("Error reading input: %v", err))
}
opt := binary.ReadJSON(&BarakOptions{}, optBytes, &err).(*BarakOptions)
if err != nil {
panic(Fmt("Error parsing input: %v", err))
}
return opt
}
func ensureRootDir() (rootDir string) {
rootDir = os.Getenv("BRKROOT")
if rootDir == "" {
rootDir = os.Getenv("HOME") + "/.barak"
}
err := EnsureDir(rootDir)
if err != nil {
panic(Fmt("Error creating barak rootDir: %v", err))
}
return
}
func NewBarakFromOptions(opt *BarakOptions) *Barak {
rootDir := ensureRootDir()
barak := NewBarak(rootDir, opt.StartNonce, opt.Validators)
for _, registry := range opt.Registries {
barak.AddRegistry(registry)
}
barak.OpenListener(opt.ListenAddress)
// Debug.
fmt.Printf("Options: %v\n", opt)
fmt.Printf("Barak: %v\n", barak)
return barak
}
//--------------------------------------------------------------------------------
type Barak struct {
mtx sync.Mutex
pid int
nonce uint64
processes map[string]*pcm.Process
validators []Validator
listeners []net.Listener
rootDir string
registries []string
}
func NewBarak(rootDir string, nonce uint64, validators []Validator) *Barak {
return &Barak{
pid: os.Getpid(),
nonce: nonce,
processes: make(map[string]*pcm.Process),
validators: validators,
listeners: nil,
rootDir: rootDir,
registries: nil,
}
}
func (brk *Barak) RootDir() string {
brk.mtx.Lock()
defer brk.mtx.Unlock()
return brk.rootDir
}
func (brk *Barak) ListProcesses() []*pcm.Process {
brk.mtx.Lock()
defer brk.mtx.Unlock()
processes := []*pcm.Process{}
for _, process := range processes {
processes = append(processes, process)
}
return processes
}
func (brk *Barak) GetProcess(label string) *pcm.Process {
brk.mtx.Lock()
defer brk.mtx.Unlock()
return brk.processes[label]
}
func (brk *Barak) AddProcess(label string, process *pcm.Process) error {
brk.mtx.Lock()
defer brk.mtx.Unlock()
existing := brk.processes[label]
if existing != nil && existing.EndTime.IsZero() {
return fmt.Errorf("Process already exists: %v", label)
}
brk.processes[label] = process
return nil
}
func (brk *Barak) StopProcess(label string, kill bool) error {
barak.mtx.Lock()
proc := barak.processes[label]
barak.mtx.Unlock()
if proc == nil {
return fmt.Errorf("Process does not exist: %v", label)
}
err := pcm.Stop(proc, kill)
return err
}
func (brk *Barak) ListValidators() []Validator {
brk.mtx.Lock()
defer brk.mtx.Unlock()
return brk.validators
}
func (brk *Barak) ListListeners() []net.Listener {
brk.mtx.Lock()
defer brk.mtx.Unlock()
return brk.listeners
}
func (brk *Barak) OpenListener(addr string) net.Listener {
brk.mtx.Lock()
defer brk.mtx.Unlock()
// Start rpc server.
mux := http.NewServeMux()
mux.HandleFunc("/download", ServeFileHandler)
mux.HandleFunc("/register", RegisterHandler)
// TODO: mux.HandleFunc("/upload", UploadFile)
rpcserver.RegisterRPCFuncs(mux, Routes)
listener := rpcserver.StartHTTPServer(addr, mux)
brk.listeners = append(brk.listeners, listener)
return listener
}
func (brk *Barak) CloseListener(addr string) {
brk.mtx.Lock()
defer brk.mtx.Unlock()
filtered := []net.Listener{}
for _, listener := range brk.listeners {
if listener.Addr().String() == addr {
continue
}
filtered = append(filtered, listener)
}
brk.listeners = filtered
}
func (brk *Barak) GetRegistries() []string {
brk.mtx.Lock()
defer brk.mtx.Unlock()
return brk.registries
}
func (brk *Barak) AddRegistry(registry string) {
brk.mtx.Lock()
defer brk.mtx.Unlock()
brk.registries = append(brk.registries, registry)
}
func (brk *Barak) RemoveRegistry(registry string) {
brk.mtx.Lock()
defer brk.mtx.Unlock()
filtered := []string{}
for _, reg := range brk.registries {
if registry == reg {
continue
}
filtered = append(filtered, reg)
}
brk.registries = filtered
}
func (brk *Barak) StartRegisterRoutine() {
// Register this barak with central listener
go func() {
// Workaround around issues when registries register on themselves upon startup.
time.Sleep(3 * time.Second)
for {
// Every hour, register with the registries.
for _, registry := range brk.registries {
resp, err := http.Get(registry + "/register")
if err != nil {
fmt.Printf("Error registering to registry %v:\n %v\n", registry, err)
} else if resp.StatusCode != 200 {
body, _ := ioutil.ReadAll(resp.Body)
fmt.Printf("Error registering to registry %v:\n %v\n", registry, string(body))
} else {
body, _ := ioutil.ReadAll(resp.Body)
fmt.Printf("Successfully registered with registry %v\n %v\n", registry, string(body))
}
}
time.Sleep(1 * time.Hour)
}
}()
}
// Write pid to file.
func (brk *Barak) WritePidFile() {
err := WriteFileAtomic(brk.rootDir+"/pidfile", []byte(Fmt("%v", brk.pid)))
if err != nil {
panic(Fmt("Error writing pidfile: %v", err))
}
}
func (brk *Barak) CheckIncrNonce(newNonce uint64) error {
brk.mtx.Lock()
defer brk.mtx.Unlock()
if brk.nonce+1 != newNonce {
return errors.New("Replay error")
}
brk.nonce += 1
return nil
}

+ 61
- 135
cmd/barak/main.go View File

@ -9,11 +9,9 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"reflect"
"sync"
"time"
"github.com/tendermint/tendermint/binary"
@ -24,36 +22,24 @@ import (
"github.com/tendermint/tendermint/rpc/server"
)
var Routes = map[string]*rpcserver.RPCFunc{
"status": rpcserver.NewRPCFunc(Status, []string{}),
"run": rpcserver.NewRPCFunc(Run, []string{"auth_command"}),
// NOTE: also, two special non-JSONRPC routes called "download" and "upload"
}
var Routes map[string]*rpcserver.RPCFunc
type Options struct {
Validators []Validator
ListenAddress string
StartNonce uint64
Registries []string
func init() {
Routes = map[string]*rpcserver.RPCFunc{
"status": rpcserver.NewRPCFunc(Status, []string{}),
"run": rpcserver.NewRPCFunc(Run, []string{"auth_command"}),
// NOTE: also, two special non-JSONRPC routes called "download" and "upload"
}
}
// Global instance
var barak = struct {
mtx sync.Mutex
pid int
nonce uint64
processes map[string]*pcm.Process
validators []Validator
rootDir string
registries []string
}{
mtx: sync.Mutex{},
pid: os.Getpid(),
nonce: 0,
processes: make(map[string]*pcm.Process),
validators: nil,
rootDir: "",
registries: nil,
var barak *Barak
// Parse command-line options
func parseFlags() (optionsFile string) {
flag.StringVar(&optionsFile, "options-file", "", "Read options from file instead of stdin")
flag.Parse()
return
}
func main() {
@ -62,71 +48,14 @@ func main() {
// Apply bare tendermint/* configuration.
cfg.ApplyConfig(cfg.MapConfig(map[string]interface{}{"log_level": "info"}))
// read flags to change options file.
var optionsBytes []byte
var optionsFile string
var err error
flag.StringVar(&optionsFile, "options-file", "", "Read options from file instead of stdin")
flag.Parse()
if optionsFile != "" {
optionsBytes, err = ioutil.ReadFile(optionsFile)
} else {
optionsBytes, err = ioutil.ReadAll(os.Stdin)
}
if err != nil {
panic(Fmt("Error reading input: %v", err))
}
options := binary.ReadJSON(&Options{}, optionsBytes, &err).(*Options)
if err != nil {
panic(Fmt("Error parsing input: %v", err))
}
barak.nonce = options.StartNonce
barak.validators = options.Validators
barak.rootDir = os.Getenv("BRKROOT")
if barak.rootDir == "" {
barak.rootDir = os.Getenv("HOME") + "/.barak"
}
err = EnsureDir(barak.rootDir)
if err != nil {
panic(Fmt("Error creating barak rootDir: %v", err))
}
barak.registries = options.Registries
// Debug.
fmt.Printf("Options: %v\n", options)
fmt.Printf("Barak: %v\n", barak)
// Start rpc server.
mux := http.NewServeMux()
mux.HandleFunc("/download", ServeFile)
mux.HandleFunc("/register", Register)
// TODO: mux.HandleFunc("/upload", UploadFile)
rpcserver.RegisterRPCFuncs(mux, Routes)
rpcserver.StartHTTPServer(options.ListenAddress, mux)
// Register this barak with central listener
for _, registry := range barak.registries {
go func(registry string) {
for {
resp, err := http.Get(registry + "/register")
if err != nil {
fmt.Printf("Error registering to registry %v:\n %v\n", registry, err)
time.Sleep(1 * time.Hour)
continue
}
body, _ := ioutil.ReadAll(resp.Body)
fmt.Printf("Successfully registered with registry %v\n %v\n", registry, string(body))
return
}
}(registry)
}
// Write pid to file. This should be the last thing before TrapSignal.
err = WriteFileAtomic(barak.rootDir+"/pidfile", []byte(Fmt("%v", barak.pid)))
if err != nil {
panic(Fmt("Error writing pidfile: %v", err))
}
// Read options
optionsFile := parseFlags()
options := ReadBarakOptions(optionsFile)
// Init barak
barak = NewBarakFromOptions(options)
barak.StartRegisterRoutine()
barak.WritePidFile() // This should be last, before TrapSignal().
TrapSignal(func() {
fmt.Println("Barak shutting down")
})
@ -157,12 +86,16 @@ func Run(authCommand AuthCommand) (interface{}, error) {
log.Info(Fmt("Run() received command %v:\n%v", reflect.TypeOf(command), command))
// Issue command
switch c := command.(type) {
case CommandRunProcess:
return RunProcess(c.Wait, c.Label, c.ExecPath, c.Args, c.Input)
case CommandStartProcess:
return StartProcess(c.Wait, c.Label, c.ExecPath, c.Args, c.Input)
case CommandStopProcess:
return StopProcess(c.Label, c.Kill)
case CommandListProcesses:
return ListProcesses()
case CommandOpenListener:
return OpenListener(c.Addr)
case CommandCloseListener:
return CloseListener(c.Addr)
default:
return nil, errors.New("Invalid endpoint for command")
}
@ -182,7 +115,7 @@ func parseValidateCommand(authCommand AuthCommand) (Command, error) {
commandJSONStr := authCommand.CommandJSONStr
signatures := authCommand.Signatures
// Validate commandJSONStr
if !validate([]byte(commandJSONStr), barak.validators, signatures) {
if !validate([]byte(commandJSONStr), barak.ListValidators(), signatures) {
fmt.Printf("Failed validation attempt")
return nil, errors.New("Validation error")
}
@ -194,11 +127,7 @@ func parseValidateCommand(authCommand AuthCommand) (Command, error) {
return nil, errors.New("Command parse error")
}
// Prevent replays
if barak.nonce+1 != command.Nonce {
return nil, errors.New("Replay error")
} else {
barak.nonce += 1
}
barak.CheckIncrNonce(command.Nonce)
return command.Command, nil
}
@ -206,48 +135,42 @@ func parseValidateCommand(authCommand AuthCommand) (Command, error) {
// RPC base commands
// WARNING Not validated, do not export to routes.
func RunProcess(wait bool, label string, execPath string, args []string, input string) (*ResponseRunProcess, error) {
barak.mtx.Lock()
func StartProcess(wait bool, label string, execPath string, args []string, input string) (*ResponseStartProcess, error) {
// First, see if there already is a process labeled 'label'
existing := barak.processes[label]
existing := barak.GetProcess(label)
if existing != nil && existing.EndTime.IsZero() {
barak.mtx.Unlock()
return nil, fmt.Errorf("Process already exists: %v", label)
}
// Otherwise, create one.
err := EnsureDir(barak.rootDir + "/outputs")
err := EnsureDir(barak.RootDir() + "/outputs")
if err != nil {
return nil, fmt.Errorf("Failed to create outputs dir: %v", err)
}
outPath := Fmt("%v/outputs/%v_%v.out", barak.rootDir, label, time.Now().Format("2006_01_02_15_04_05_MST"))
outPath := Fmt("%v/outputs/%v_%v.out", barak.RootDir(), label, time.Now().Format("2006_01_02_15_04_05_MST"))
proc, err := pcm.Create(pcm.ProcessModeDaemon, label, execPath, args, input, outPath)
if err == nil {
barak.processes[label] = proc
}
barak.mtx.Unlock()
if err != nil {
return nil, err
}
barak.AddProcess(label, proc)
if wait {
<-proc.WaitCh
output := pcm.ReadOutput(proc)
fmt.Println("Read output", output)
if proc.ExitState == nil {
return &ResponseRunProcess{
return &ResponseStartProcess{
Success: true,
Output: output,
}, nil
} else {
return &ResponseRunProcess{
return &ResponseStartProcess{
Success: proc.ExitState.Success(), // Would be always false?
Output: output,
}, nil
}
} else {
return &ResponseRunProcess{
return &ResponseStartProcess{
Success: true,
Output: "",
}, nil
@ -255,36 +178,35 @@ func RunProcess(wait bool, label string, execPath string, args []string, input s
}
func StopProcess(label string, kill bool) (*ResponseStopProcess, error) {
barak.mtx.Lock()
proc := barak.processes[label]
barak.mtx.Unlock()
if proc == nil {
return nil, fmt.Errorf("Process does not exist: %v", label)
}
err := pcm.Stop(proc, kill)
err := barak.StopProcess(label, kill)
return &ResponseStopProcess{}, err
}
func ListProcesses() (*ResponseListProcesses, error) {
var procs = []*pcm.Process{}
barak.mtx.Lock()
fmt.Println("Processes: %v", barak.processes)
for _, proc := range barak.processes {
procs = append(procs, proc)
}
barak.mtx.Unlock()
procs := barak.ListProcesses()
return &ResponseListProcesses{
Processes: procs,
}, nil
}
func OpenListener(addr string) (*ResponseOpenListener, error) {
listener := barak.OpenListener(addr)
return &ResponseOpenListener{
Addr: listener.Addr().String(),
}, nil
}
func CloseListener(addr string) (*ResponseCloseListener, error) {
barak.CloseListener(addr)
return &ResponseCloseListener{}, nil
}
//--------------------------------------------------------------------------------
// Another barak instance registering its external
// address to a remote barak.
func Register(w http.ResponseWriter, req *http.Request) {
registry, err := os.OpenFile(barak.rootDir+"/registry.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
func RegisterHandler(w http.ResponseWriter, req *http.Request) {
registry, err := os.OpenFile(barak.RootDir()+"/registry.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
http.Error(w, "Could not open registry file. Please contact the administrator", 500)
return
@ -297,7 +219,7 @@ func Register(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("Noted!"))
}
func ServeFile(w http.ResponseWriter, req *http.Request) {
func ServeFileHandler(w http.ResponseWriter, req *http.Request) {
authCommandStr := req.FormValue("auth_command")
command, err := parseValidateCommandStr(authCommandStr)
if err != nil {
@ -316,7 +238,11 @@ func ServeFile(w http.ResponseWriter, req *http.Request) {
// local paths must be explicitly local, e.g. "./xyz"
} else if path[0] != '/' {
// If not an absolute path, then is label
proc := barak.processes[path]
proc := barak.GetProcess(path)
if proc == nil {
http.Error(w, Fmt("Unknown process label: %v", path), 400)
return
}
path = proc.OutputPath
}
file, err := os.Open(path)


+ 15
- 3
cmd/barak/types/command.go View File

@ -18,22 +18,26 @@ type NoncedCommand struct {
type Command interface{}
const (
commandTypeRunProcess = 0x01
commandTypeStartProcess = 0x01
commandTypeStopProcess = 0x02
commandTypeListProcesses = 0x03
commandTypeServeFile = 0x04
commandTypeOpenListener = 0x05
commandTypeCloseListener = 0x06
)
// for binary.readReflect
var _ = binary.RegisterInterface(
struct{ Command }{},
binary.ConcreteType{CommandRunProcess{}, commandTypeRunProcess},
binary.ConcreteType{CommandStartProcess{}, commandTypeStartProcess},
binary.ConcreteType{CommandStopProcess{}, commandTypeStopProcess},
binary.ConcreteType{CommandListProcesses{}, commandTypeListProcesses},
binary.ConcreteType{CommandServeFile{}, commandTypeServeFile},
binary.ConcreteType{CommandOpenListener{}, commandTypeOpenListener},
binary.ConcreteType{CommandCloseListener{}, commandTypeCloseListener},
)
type CommandRunProcess struct {
type CommandStartProcess struct {
Wait bool
Label string
ExecPath string
@ -51,3 +55,11 @@ type CommandListProcesses struct{}
type CommandServeFile struct {
Path string
}
type CommandOpenListener struct {
Addr string
}
type CommandCloseListener struct {
Addr string
}

+ 8
- 1
cmd/barak/types/responses.go View File

@ -13,7 +13,7 @@ type ResponseStatus struct {
type ResponseRegister struct {
}
type ResponseRunProcess struct {
type ResponseStartProcess struct {
Success bool
Output string
}
@ -24,3 +24,10 @@ type ResponseStopProcess struct {
type ResponseListProcesses struct {
Processes []*pcm.Process
}
type ResponseOpenListener struct {
Addr string
}
type ResponseCloseListener struct {
}

+ 1
- 1
cmd/debora/commands.go View File

@ -19,7 +19,7 @@ import (
// (First the command(s) are signed by all validators,
// and then it is broadcast).
func RunProcess(privKey acm.PrivKey, remote string, command btypes.CommandRunProcess) (response btypes.ResponseRunProcess, err error) {
func StartProcess(privKey acm.PrivKey, remote string, command btypes.CommandStartProcess) (response btypes.ResponseStartProcess, err error) {
nonce, err := GetNonce(remote)
if err != nil {
return response, err


+ 4
- 4
cmd/debora/main.go View File

@ -87,7 +87,7 @@ func main() {
cli.Command{
Name: "run",
Usage: "run process",
Action: cliRunProcess,
Action: cliStartProcess,
Flags: []cli.Flag{
labelFlag,
bgFlag,
@ -145,14 +145,14 @@ func cliGetStatus(c *cli.Context) {
wg.Wait()
}
func cliRunProcess(c *cli.Context) {
func cliStartProcess(c *cli.Context) {
args := c.Args()
if len(args) < 1 {
Exit("Must specify <execPath> <args...>")
}
execPath := args[0]
args = args[1:]
command := btypes.CommandRunProcess{
command := btypes.CommandStartProcess{
Wait: !c.Bool("bg"),
Label: c.String("label"),
ExecPath: execPath,
@ -164,7 +164,7 @@ func cliRunProcess(c *cli.Context) {
wg.Add(1)
go func(remote string) {
defer wg.Done()
response, err := RunProcess(Config.PrivKey, remote, command)
response, err := StartProcess(Config.PrivKey, remote, command)
if err != nil {
fmt.Printf("%v failure. %v\n", remote, err)
} else {


+ 6
- 5
rpc/server/http_server.go View File

@ -16,19 +16,20 @@ import (
. "github.com/tendermint/tendermint/rpc/types"
)
func StartHTTPServer(listenAddr string, handler http.Handler) {
func StartHTTPServer(listenAddr string, handler http.Handler) net.Listener {
log.Info(Fmt("Starting RPC HTTP server on %v", listenAddr))
listener, err := net.Listen("tcp", listenAddr)
if err != nil {
Exit(Fmt("Failed to listen to %v", listenAddr))
}
go func() {
listener, err := net.Listen("tcp", listenAddr)
if err != nil {
Exit(Fmt("Failed to listen to %v", listenAddr))
}
res := http.Serve(
listener,
RecoverAndLogHandler(handler),
)
log.Crit("RPC HTTP server stopped", "result", res)
}()
return listener
}
func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) {


Loading…
Cancel
Save