You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

284 lines
7.1 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package main
  2. // A note on the origin of the name.
  3. // http://en.wikipedia.org/wiki/Barak
  4. // TODO: Nonrepudiable command log
  5. import (
  6. "errors"
  7. "flag"
  8. "fmt"
  9. "io"
  10. "io/ioutil"
  11. "net/http"
  12. "os"
  13. "reflect"
  14. "sync"
  15. "time"
  16. "github.com/tendermint/tendermint/binary"
  17. . "github.com/tendermint/tendermint/cmd/barak/types"
  18. . "github.com/tendermint/tendermint/common"
  19. pcm "github.com/tendermint/tendermint/process"
  20. "github.com/tendermint/tendermint/rpc"
  21. )
  22. var Routes = map[string]*rpc.RPCFunc{
  23. "status": rpc.NewRPCFunc(Status, []string{}),
  24. "run": rpc.NewRPCFunc(Run, []string{"auth_command"}),
  25. // NOTE: also, two special non-JSONRPC routes called "download" and "upload"
  26. }
  27. type Options struct {
  28. Validators []Validator
  29. ListenAddress string
  30. StartNonce uint64
  31. }
  32. // Global instance
  33. var barak = struct {
  34. mtx sync.Mutex
  35. pid int
  36. nonce uint64
  37. processes map[string]*pcm.Process
  38. validators []Validator
  39. rootDir string
  40. }{
  41. mtx: sync.Mutex{},
  42. pid: os.Getpid(),
  43. nonce: 0,
  44. processes: make(map[string]*pcm.Process),
  45. validators: nil,
  46. rootDir: "",
  47. }
  48. func main() {
  49. fmt.Printf("New Barak Process (PID: %d)\n", os.Getpid())
  50. // read flags to change options file.
  51. var optionsBytes []byte
  52. var optionsFile string
  53. var err error
  54. flag.StringVar(&optionsFile, "options-file", "", "Read options from file instead of stdin")
  55. flag.Parse()
  56. if optionsFile != "" {
  57. optionsBytes, err = ioutil.ReadFile(optionsFile)
  58. } else {
  59. optionsBytes, err = ioutil.ReadAll(os.Stdin)
  60. }
  61. if err != nil {
  62. panic(Fmt("Error reading input: %v", err))
  63. }
  64. options := binary.ReadJSON(&Options{}, optionsBytes, &err).(*Options)
  65. if err != nil {
  66. panic(Fmt("Error parsing input: %v", err))
  67. }
  68. barak.nonce = options.StartNonce
  69. barak.validators = options.Validators
  70. barak.rootDir = os.Getenv("BRKROOT")
  71. if barak.rootDir == "" {
  72. barak.rootDir = os.Getenv("HOME") + "/.barak"
  73. }
  74. err = EnsureDir(barak.rootDir)
  75. if err != nil {
  76. panic(Fmt("Error creating barak rootDir: %v", err))
  77. }
  78. // Write pid to file.
  79. err = AtomicWriteFile(barak.rootDir+"/pidfile", []byte(Fmt("%v", barak.pid)))
  80. if err != nil {
  81. panic(Fmt("Error writing pidfile: %v", err))
  82. }
  83. // Debug.
  84. fmt.Printf("Options: %v\n", options)
  85. fmt.Printf("Barak: %v\n", barak)
  86. // Start rpc server.
  87. mux := http.NewServeMux()
  88. mux.HandleFunc("/download", ServeFile)
  89. // TODO: mux.HandleFunc("/upload", UploadFile)
  90. rpc.RegisterRPCFuncs(mux, Routes)
  91. rpc.StartHTTPServer(options.ListenAddress, mux)
  92. TrapSignal(func() {
  93. fmt.Println("Barak shutting down")
  94. })
  95. }
  96. //------------------------------------------------------------------------------
  97. // RPC functions
  98. func Status() (*ResponseStatus, error) {
  99. barak.mtx.Lock()
  100. pid := barak.pid
  101. nonce := barak.nonce
  102. validators := barak.validators
  103. barak.mtx.Unlock()
  104. return &ResponseStatus{
  105. Pid: pid,
  106. Nonce: nonce,
  107. Validators: validators,
  108. }, nil
  109. }
  110. func Run(authCommand AuthCommand) (interface{}, error) {
  111. command, err := parseValidateCommand(authCommand)
  112. if err != nil {
  113. return nil, err
  114. }
  115. log.Info(Fmt("Run() received command %v:\n%v", reflect.TypeOf(command), command))
  116. // Issue command
  117. switch c := command.(type) {
  118. case CommandRunProcess:
  119. return RunProcess(c.Wait, c.Label, c.ExecPath, c.Args, c.Input)
  120. case CommandStopProcess:
  121. return StopProcess(c.Label, c.Kill)
  122. case CommandListProcesses:
  123. return ListProcesses()
  124. default:
  125. return nil, errors.New("Invalid endpoint for command")
  126. }
  127. }
  128. func parseValidateCommandStr(authCommandStr string) (Command, error) {
  129. var err error
  130. authCommand := binary.ReadJSON(AuthCommand{}, []byte(authCommandStr), &err).(AuthCommand)
  131. if err != nil {
  132. fmt.Printf("Failed to parse auth_command")
  133. return nil, errors.New("AuthCommand parse error")
  134. }
  135. return parseValidateCommand(authCommand)
  136. }
  137. func parseValidateCommand(authCommand AuthCommand) (Command, error) {
  138. commandJSONStr := authCommand.CommandJSONStr
  139. signatures := authCommand.Signatures
  140. // Validate commandJSONStr
  141. if !validate([]byte(commandJSONStr), barak.validators, signatures) {
  142. fmt.Printf("Failed validation attempt")
  143. return nil, errors.New("Validation error")
  144. }
  145. // Parse command
  146. var err error
  147. command := binary.ReadJSON(NoncedCommand{}, []byte(commandJSONStr), &err).(NoncedCommand)
  148. if err != nil {
  149. fmt.Printf("Failed to parse command")
  150. return nil, errors.New("Command parse error")
  151. }
  152. // Prevent replays
  153. if barak.nonce+1 != command.Nonce {
  154. return nil, errors.New("Replay error")
  155. } else {
  156. barak.nonce += 1
  157. }
  158. return command.Command, nil
  159. }
  160. //------------------------------------------------------------------------------
  161. // RPC base commands
  162. // WARNING Not validated, do not export to routes.
  163. func RunProcess(wait bool, label string, execPath string, args []string, input string) (*ResponseRunProcess, error) {
  164. barak.mtx.Lock()
  165. // First, see if there already is a process labeled 'label'
  166. existing := barak.processes[label]
  167. if existing != nil && existing.EndTime.IsZero() {
  168. barak.mtx.Unlock()
  169. return nil, fmt.Errorf("Process already exists: %v", label)
  170. }
  171. // Otherwise, create one.
  172. err := EnsureDir(barak.rootDir + "/outputs")
  173. if err != nil {
  174. return nil, fmt.Errorf("Failed to create outputs dir: %v", err)
  175. }
  176. outPath := Fmt("%v/outputs/%v_%v.out", barak.rootDir, label, time.Now().Format("2006_01_02_15_04_05_MST"))
  177. proc, err := pcm.Create(pcm.ProcessModeDaemon, label, execPath, args, input, outPath)
  178. if err == nil {
  179. barak.processes[label] = proc
  180. }
  181. barak.mtx.Unlock()
  182. if err != nil {
  183. return nil, err
  184. }
  185. if wait {
  186. <-proc.WaitCh
  187. output := pcm.ReadOutput(proc)
  188. fmt.Println("Read output", output)
  189. if proc.ExitState == nil {
  190. return &ResponseRunProcess{
  191. Success: true,
  192. Output: output,
  193. }, nil
  194. } else {
  195. return &ResponseRunProcess{
  196. Success: proc.ExitState.Success(), // Would be always false?
  197. Output: output,
  198. }, nil
  199. }
  200. } else {
  201. return &ResponseRunProcess{
  202. Success: true,
  203. Output: "",
  204. }, nil
  205. }
  206. }
  207. func StopProcess(label string, kill bool) (*ResponseStopProcess, error) {
  208. barak.mtx.Lock()
  209. proc := barak.processes[label]
  210. barak.mtx.Unlock()
  211. if proc == nil {
  212. return nil, fmt.Errorf("Process does not exist: %v", label)
  213. }
  214. err := pcm.Stop(proc, kill)
  215. return &ResponseStopProcess{}, err
  216. }
  217. func ListProcesses() (*ResponseListProcesses, error) {
  218. var procs = []*pcm.Process{}
  219. barak.mtx.Lock()
  220. fmt.Println("Processes: %v", barak.processes)
  221. for _, proc := range barak.processes {
  222. procs = append(procs, proc)
  223. }
  224. barak.mtx.Unlock()
  225. return &ResponseListProcesses{
  226. Processes: procs,
  227. }, nil
  228. }
  229. func ServeFile(w http.ResponseWriter, req *http.Request) {
  230. authCommandStr := req.FormValue("auth_command")
  231. command, err := parseValidateCommandStr(authCommandStr)
  232. if err != nil {
  233. http.Error(w, Fmt("Invalid command: %v", err), 400)
  234. }
  235. serveCommand, ok := command.(CommandServeFile)
  236. if !ok {
  237. http.Error(w, "Invalid command", 400)
  238. }
  239. path := serveCommand.Path
  240. if path == "" {
  241. http.Error(w, "Must specify path", 400)
  242. return
  243. }
  244. file, err := os.Open(path)
  245. if err != nil {
  246. http.Error(w, Fmt("Error opening file: %v. %v", path, err), 400)
  247. return
  248. }
  249. _, err = io.Copy(w, file)
  250. if err != nil {
  251. fmt.Fprintf(os.Stderr, Fmt("Error serving file: %v. %v", path, err))
  252. return
  253. }
  254. }