You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

275 lines
7.4 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package main
  2. // A note on the origin of the name.
  3. // http://en.wikipedia.org/wiki/Barak
  4. // TODO: Nonrepudiable command log
  5. import (
  6. "errors"
  7. "flag"
  8. "fmt"
  9. "io"
  10. "net/http"
  11. "os"
  12. "reflect"
  13. "time"
  14. "github.com/tendermint/tendermint/binary"
  15. . "github.com/tendermint/tendermint/cmd/barak/types"
  16. . "github.com/tendermint/tendermint/common"
  17. cfg "github.com/tendermint/tendermint/config"
  18. pcm "github.com/tendermint/tendermint/process"
  19. "github.com/tendermint/tendermint/rpc/server"
  20. )
  21. const BarakVersion = "0.0.1"
  22. var Routes map[string]*rpcserver.RPCFunc
  23. func init() {
  24. Routes = map[string]*rpcserver.RPCFunc{
  25. "status": rpcserver.NewRPCFunc(Status, []string{}),
  26. "run": rpcserver.NewRPCFunc(Run, []string{"auth_command"}),
  27. // NOTE: also, two special non-JSONRPC routes called "download" and "upload"
  28. }
  29. }
  30. // Global instance
  31. var barak_ *Barak
  32. // Parse command-line options
  33. func parseFlags() (optionsFile string) {
  34. flag.StringVar(&optionsFile, "config", "", "Read config from file instead of stdin")
  35. flag.Parse()
  36. return
  37. }
  38. func main() {
  39. fmt.Printf("New Barak Process (PID: %d)\n", os.Getpid())
  40. // Apply bare tendermint/* configuration.
  41. cfg.ApplyConfig(cfg.MapConfig(map[string]interface{}{"log_level": "info"}))
  42. // Read options
  43. optionsFile := parseFlags()
  44. options := ReadBarakOptions(optionsFile)
  45. // Init barak
  46. barak_ = NewBarakFromOptions(options)
  47. barak_.StartRegisterRoutine()
  48. barak_.WritePidFile() // This should be last, before TrapSignal().
  49. TrapSignal(func() {
  50. fmt.Println("Barak shutting down")
  51. })
  52. }
  53. //------------------------------------------------------------------------------
  54. // RPC functions
  55. func Status() (*ResponseStatus, error) {
  56. barak_.mtx.Lock()
  57. pid := barak_.pid
  58. nonce := barak_.nonce
  59. validators := barak_.validators
  60. barak_.mtx.Unlock()
  61. return &ResponseStatus{
  62. Version: BarakVersion,
  63. Pid: pid,
  64. Nonce: nonce,
  65. Validators: validators,
  66. }, nil
  67. }
  68. func Run(authCommand AuthCommand) (interface{}, error) {
  69. command, err := parseValidateCommand(authCommand)
  70. if err != nil {
  71. return nil, err
  72. }
  73. log.Info(Fmt("Run() received command %v:%v", reflect.TypeOf(command), command))
  74. // Issue command
  75. switch c := command.(type) {
  76. case CommandStartProcess:
  77. return StartProcess(c.Wait, c.Label, c.ExecPath, c.Args, c.Input)
  78. case CommandStopProcess:
  79. return StopProcess(c.Label, c.Kill)
  80. case CommandListProcesses:
  81. return ListProcesses()
  82. case CommandOpenListener:
  83. return OpenListener(c.Addr)
  84. case CommandCloseListener:
  85. return CloseListener(c.Addr)
  86. case CommandQuit:
  87. return Quit()
  88. default:
  89. return nil, errors.New("Invalid endpoint for command")
  90. }
  91. }
  92. func parseValidateCommandStr(authCommandStr string) (Command, error) {
  93. var err error
  94. authCommand := binary.ReadJSON(AuthCommand{}, []byte(authCommandStr), &err).(AuthCommand)
  95. if err != nil {
  96. fmt.Printf("Failed to parse auth_command")
  97. return nil, errors.New("AuthCommand parse error")
  98. }
  99. return parseValidateCommand(authCommand)
  100. }
  101. func parseValidateCommand(authCommand AuthCommand) (Command, error) {
  102. commandJSONStr := authCommand.CommandJSONStr
  103. signatures := authCommand.Signatures
  104. // Validate commandJSONStr
  105. if !validate([]byte(commandJSONStr), barak_.ListValidators(), signatures) {
  106. fmt.Printf("Failed validation attempt")
  107. return nil, errors.New("Validation error")
  108. }
  109. // Parse command
  110. var err error
  111. command := binary.ReadJSON(NoncedCommand{}, []byte(commandJSONStr), &err).(NoncedCommand)
  112. if err != nil {
  113. fmt.Printf("Failed to parse command")
  114. return nil, errors.New("Command parse error")
  115. }
  116. // Prevent replays
  117. barak_.CheckIncrNonce(command.Nonce)
  118. return command.Command, nil
  119. }
  120. //------------------------------------------------------------------------------
  121. // RPC base commands
  122. // WARNING Not validated, do not export to routes.
  123. func StartProcess(wait bool, label string, execPath string, args []string, input string) (*ResponseStartProcess, error) {
  124. // First, see if there already is a process labeled 'label'
  125. existing := barak_.GetProcess(label)
  126. if existing != nil && existing.EndTime.IsZero() {
  127. return nil, fmt.Errorf("Process already exists: %v", label)
  128. }
  129. // Otherwise, create one.
  130. err := EnsureDir(barak_.RootDir() + "/outputs")
  131. if err != nil {
  132. return nil, fmt.Errorf("Failed to create outputs dir: %v", err)
  133. }
  134. outPath := Fmt("%v/outputs/%v_%v.out", barak_.RootDir(), label, time.Now().Format("2006_01_02_15_04_05_MST"))
  135. proc, err := pcm.Create(pcm.ProcessModeDaemon, label, execPath, args, input, outPath)
  136. if err != nil {
  137. return nil, err
  138. }
  139. barak_.AddProcess(label, proc)
  140. if wait {
  141. <-proc.WaitCh
  142. output := pcm.ReadOutput(proc)
  143. // fmt.Println("Read output", output)
  144. if proc.ExitState == nil {
  145. return &ResponseStartProcess{
  146. Success: true,
  147. Output: output,
  148. }, nil
  149. } else {
  150. return &ResponseStartProcess{
  151. Success: proc.ExitState.Success(), // Would be always false?
  152. Output: output,
  153. }, nil
  154. }
  155. } else {
  156. return &ResponseStartProcess{
  157. Success: true,
  158. Output: "",
  159. }, nil
  160. }
  161. }
  162. func StopProcess(label string, kill bool) (*ResponseStopProcess, error) {
  163. err := barak_.StopProcess(label, kill)
  164. return &ResponseStopProcess{}, err
  165. }
  166. func ListProcesses() (*ResponseListProcesses, error) {
  167. procs := barak_.ListProcesses()
  168. return &ResponseListProcesses{
  169. Processes: procs,
  170. }, nil
  171. }
  172. func OpenListener(addr string) (*ResponseOpenListener, error) {
  173. listener, err := barak_.OpenListener(addr)
  174. if err != nil {
  175. return nil, err
  176. }
  177. return &ResponseOpenListener{
  178. Addr: listener.Addr().String(),
  179. }, nil
  180. }
  181. func CloseListener(addr string) (*ResponseCloseListener, error) {
  182. barak_.CloseListener(addr)
  183. return &ResponseCloseListener{}, nil
  184. }
  185. func Quit() (*ResponseQuit, error) {
  186. fmt.Println("Barak shutting down due to Quit()")
  187. go func() {
  188. time.Sleep(time.Second)
  189. os.Exit(0)
  190. }()
  191. return &ResponseQuit{}, nil
  192. }
  193. //--------------------------------------------------------------------------------
  194. // Another barak instance registering its external
  195. // address to a remote barak.
  196. func RegisterHandler(w http.ResponseWriter, req *http.Request) {
  197. registry, err := os.OpenFile(barak_.RootDir()+"/registry.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
  198. if err != nil {
  199. http.Error(w, "Could not open registry file. Please contact the administrator", 500)
  200. return
  201. }
  202. // TODO: Also check the X-FORWARDED-FOR or whatever it's called.
  203. registry.Write([]byte(Fmt("++ %v\n", req.RemoteAddr)))
  204. registry.Close()
  205. w.Header().Set("Content-Type", "application/json")
  206. w.WriteHeader(200)
  207. w.Write([]byte("Noted!"))
  208. }
  209. func ServeFileHandler(w http.ResponseWriter, req *http.Request) {
  210. authCommandStr := req.FormValue("auth_command")
  211. command, err := parseValidateCommandStr(authCommandStr)
  212. if err != nil {
  213. http.Error(w, Fmt("Invalid command: %v", err), 400)
  214. }
  215. serveCommand, ok := command.(CommandServeFile)
  216. if !ok {
  217. http.Error(w, "Invalid command", 400)
  218. }
  219. path := serveCommand.Path
  220. if path == "" {
  221. http.Error(w, "Must specify path", 400)
  222. return
  223. }
  224. if path[0] == '.' {
  225. // local paths must be explicitly local, e.g. "./xyz"
  226. } else if path[0] != '/' {
  227. // If not an absolute path, then is label
  228. proc := barak_.GetProcess(path)
  229. if proc == nil {
  230. http.Error(w, Fmt("Unknown process label: %v", path), 400)
  231. return
  232. }
  233. path = proc.OutputPath
  234. }
  235. file, err := os.Open(path)
  236. if err != nil {
  237. http.Error(w, Fmt("Error opening file: %v. %v", path, err), 400)
  238. return
  239. }
  240. _, err = io.Copy(w, file)
  241. if err != nil {
  242. fmt.Fprintf(os.Stderr, Fmt("Error serving file: %v. %v", path, err))
  243. return
  244. }
  245. }