You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

261 lines
7.1 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package main
  2. // A note on the origin of the name.
  3. // http://en.wikipedia.org/wiki/Barak
  4. // TODO: Nonrepudiable command log
  5. import (
  6. "errors"
  7. "flag"
  8. "fmt"
  9. "io"
  10. "net/http"
  11. "os"
  12. "reflect"
  13. "time"
  14. "github.com/tendermint/tendermint/binary"
  15. . "github.com/tendermint/tendermint/cmd/barak/types"
  16. . "github.com/tendermint/tendermint/common"
  17. cfg "github.com/tendermint/tendermint/config"
  18. pcm "github.com/tendermint/tendermint/process"
  19. "github.com/tendermint/tendermint/rpc/server"
  20. )
  21. var Routes map[string]*rpcserver.RPCFunc
  22. func init() {
  23. Routes = map[string]*rpcserver.RPCFunc{
  24. "status": rpcserver.NewRPCFunc(Status, []string{}),
  25. "run": rpcserver.NewRPCFunc(Run, []string{"auth_command"}),
  26. // NOTE: also, two special non-JSONRPC routes called "download" and "upload"
  27. }
  28. }
  29. // Global instance
  30. var barak_ *Barak
  31. // Parse command-line options
  32. func parseFlags() (optionsFile string) {
  33. flag.StringVar(&optionsFile, "options-file", "", "Read options from file instead of stdin")
  34. flag.Parse()
  35. return
  36. }
  37. func main() {
  38. fmt.Printf("New Barak Process (PID: %d)\n", os.Getpid())
  39. // Apply bare tendermint/* configuration.
  40. cfg.ApplyConfig(cfg.MapConfig(map[string]interface{}{"log_level": "info"}))
  41. // Read options
  42. optionsFile := parseFlags()
  43. options := ReadBarakOptions(optionsFile)
  44. // Init barak
  45. barak_ = NewBarakFromOptions(options)
  46. barak_.StartRegisterRoutine()
  47. barak_.WritePidFile() // This should be last, before TrapSignal().
  48. TrapSignal(func() {
  49. fmt.Println("Barak shutting down")
  50. })
  51. }
  52. //------------------------------------------------------------------------------
  53. // RPC functions
  54. func Status() (*ResponseStatus, error) {
  55. barak_.mtx.Lock()
  56. pid := barak_.pid
  57. nonce := barak_.nonce
  58. validators := barak_.validators
  59. barak_.mtx.Unlock()
  60. return &ResponseStatus{
  61. Pid: pid,
  62. Nonce: nonce,
  63. Validators: validators,
  64. }, nil
  65. }
  66. func Run(authCommand AuthCommand) (interface{}, error) {
  67. command, err := parseValidateCommand(authCommand)
  68. if err != nil {
  69. return nil, err
  70. }
  71. log.Info(Fmt("Run() received command %v:\n%v", reflect.TypeOf(command), command))
  72. // Issue command
  73. switch c := command.(type) {
  74. case CommandStartProcess:
  75. return StartProcess(c.Wait, c.Label, c.ExecPath, c.Args, c.Input)
  76. case CommandStopProcess:
  77. return StopProcess(c.Label, c.Kill)
  78. case CommandListProcesses:
  79. return ListProcesses()
  80. case CommandOpenListener:
  81. return OpenListener(c.Addr)
  82. case CommandCloseListener:
  83. return CloseListener(c.Addr)
  84. default:
  85. return nil, errors.New("Invalid endpoint for command")
  86. }
  87. }
  88. func parseValidateCommandStr(authCommandStr string) (Command, error) {
  89. var err error
  90. authCommand := binary.ReadJSON(AuthCommand{}, []byte(authCommandStr), &err).(AuthCommand)
  91. if err != nil {
  92. fmt.Printf("Failed to parse auth_command")
  93. return nil, errors.New("AuthCommand parse error")
  94. }
  95. return parseValidateCommand(authCommand)
  96. }
  97. func parseValidateCommand(authCommand AuthCommand) (Command, error) {
  98. commandJSONStr := authCommand.CommandJSONStr
  99. signatures := authCommand.Signatures
  100. // Validate commandJSONStr
  101. if !validate([]byte(commandJSONStr), barak_.ListValidators(), signatures) {
  102. fmt.Printf("Failed validation attempt")
  103. return nil, errors.New("Validation error")
  104. }
  105. // Parse command
  106. var err error
  107. command := binary.ReadJSON(NoncedCommand{}, []byte(commandJSONStr), &err).(NoncedCommand)
  108. if err != nil {
  109. fmt.Printf("Failed to parse command")
  110. return nil, errors.New("Command parse error")
  111. }
  112. // Prevent replays
  113. barak_.CheckIncrNonce(command.Nonce)
  114. return command.Command, nil
  115. }
  116. //------------------------------------------------------------------------------
  117. // RPC base commands
  118. // WARNING Not validated, do not export to routes.
  119. func StartProcess(wait bool, label string, execPath string, args []string, input string) (*ResponseStartProcess, error) {
  120. // First, see if there already is a process labeled 'label'
  121. existing := barak_.GetProcess(label)
  122. if existing != nil && existing.EndTime.IsZero() {
  123. return nil, fmt.Errorf("Process already exists: %v", label)
  124. }
  125. // Otherwise, create one.
  126. err := EnsureDir(barak_.RootDir() + "/outputs")
  127. if err != nil {
  128. return nil, fmt.Errorf("Failed to create outputs dir: %v", err)
  129. }
  130. outPath := Fmt("%v/outputs/%v_%v.out", barak_.RootDir(), label, time.Now().Format("2006_01_02_15_04_05_MST"))
  131. proc, err := pcm.Create(pcm.ProcessModeDaemon, label, execPath, args, input, outPath)
  132. if err != nil {
  133. return nil, err
  134. }
  135. barak_.AddProcess(label, proc)
  136. if wait {
  137. <-proc.WaitCh
  138. output := pcm.ReadOutput(proc)
  139. fmt.Println("Read output", output)
  140. if proc.ExitState == nil {
  141. return &ResponseStartProcess{
  142. Success: true,
  143. Output: output,
  144. }, nil
  145. } else {
  146. return &ResponseStartProcess{
  147. Success: proc.ExitState.Success(), // Would be always false?
  148. Output: output,
  149. }, nil
  150. }
  151. } else {
  152. return &ResponseStartProcess{
  153. Success: true,
  154. Output: "",
  155. }, nil
  156. }
  157. }
  158. func StopProcess(label string, kill bool) (*ResponseStopProcess, error) {
  159. err := barak_.StopProcess(label, kill)
  160. return &ResponseStopProcess{}, err
  161. }
  162. func ListProcesses() (*ResponseListProcesses, error) {
  163. procs := barak_.ListProcesses()
  164. return &ResponseListProcesses{
  165. Processes: procs,
  166. }, nil
  167. }
  168. func OpenListener(addr string) (*ResponseOpenListener, error) {
  169. listener, err := barak_.OpenListener(addr)
  170. if err != nil {
  171. return nil, err
  172. }
  173. return &ResponseOpenListener{
  174. Addr: listener.Addr().String(),
  175. }, nil
  176. }
  177. func CloseListener(addr string) (*ResponseCloseListener, error) {
  178. barak_.CloseListener(addr)
  179. return &ResponseCloseListener{}, nil
  180. }
  181. //--------------------------------------------------------------------------------
  182. // Another barak instance registering its external
  183. // address to a remote barak.
  184. func RegisterHandler(w http.ResponseWriter, req *http.Request) {
  185. registry, err := os.OpenFile(barak_.RootDir()+"/registry.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
  186. if err != nil {
  187. http.Error(w, "Could not open registry file. Please contact the administrator", 500)
  188. return
  189. }
  190. // TODO: Also check the X-FORWARDED-FOR or whatever it's called.
  191. registry.Write([]byte(Fmt("++ %v\n", req.RemoteAddr)))
  192. registry.Close()
  193. w.Header().Set("Content-Type", "application/json")
  194. w.WriteHeader(200)
  195. w.Write([]byte("Noted!"))
  196. }
  197. func ServeFileHandler(w http.ResponseWriter, req *http.Request) {
  198. authCommandStr := req.FormValue("auth_command")
  199. command, err := parseValidateCommandStr(authCommandStr)
  200. if err != nil {
  201. http.Error(w, Fmt("Invalid command: %v", err), 400)
  202. }
  203. serveCommand, ok := command.(CommandServeFile)
  204. if !ok {
  205. http.Error(w, "Invalid command", 400)
  206. }
  207. path := serveCommand.Path
  208. if path == "" {
  209. http.Error(w, "Must specify path", 400)
  210. return
  211. }
  212. if path[0] == '.' {
  213. // local paths must be explicitly local, e.g. "./xyz"
  214. } else if path[0] != '/' {
  215. // If not an absolute path, then is label
  216. proc := barak_.GetProcess(path)
  217. if proc == nil {
  218. http.Error(w, Fmt("Unknown process label: %v", path), 400)
  219. return
  220. }
  221. path = proc.OutputPath
  222. }
  223. file, err := os.Open(path)
  224. if err != nil {
  225. http.Error(w, Fmt("Error opening file: %v. %v", path, err), 400)
  226. return
  227. }
  228. _, err = io.Copy(w, file)
  229. if err != nil {
  230. fmt.Fprintf(os.Stderr, Fmt("Error serving file: %v. %v", path, err))
  231. return
  232. }
  233. }