You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

328 lines
8.6 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package main
  2. // A note on the origin of the name.
  3. // http://en.wikipedia.org/wiki/Barak
  4. // TODO: Nonrepudiable command log
  5. import (
  6. "errors"
  7. "flag"
  8. "fmt"
  9. "io"
  10. "io/ioutil"
  11. "net/http"
  12. "os"
  13. "reflect"
  14. "sync"
  15. "time"
  16. "github.com/tendermint/tendermint/binary"
  17. . "github.com/tendermint/tendermint/cmd/barak/types"
  18. . "github.com/tendermint/tendermint/common"
  19. pcm "github.com/tendermint/tendermint/process"
  20. "github.com/tendermint/tendermint/rpc"
  21. )
  22. var Routes = map[string]*rpc.RPCFunc{
  23. "status": rpc.NewRPCFunc(Status, []string{}),
  24. "run": rpc.NewRPCFunc(Run, []string{"auth_command"}),
  25. // NOTE: also, two special non-JSONRPC routes called "download" and "upload"
  26. }
  27. type Options struct {
  28. Validators []Validator
  29. ListenAddress string
  30. StartNonce uint64
  31. Registries []string
  32. }
  33. // Global instance
  34. var barak = struct {
  35. mtx sync.Mutex
  36. pid int
  37. nonce uint64
  38. processes map[string]*pcm.Process
  39. validators []Validator
  40. rootDir string
  41. registries []string
  42. }{
  43. mtx: sync.Mutex{},
  44. pid: os.Getpid(),
  45. nonce: 0,
  46. processes: make(map[string]*pcm.Process),
  47. validators: nil,
  48. rootDir: "",
  49. registries: nil,
  50. }
  51. func main() {
  52. fmt.Printf("New Barak Process (PID: %d)\n", os.Getpid())
  53. // read flags to change options file.
  54. var optionsBytes []byte
  55. var optionsFile string
  56. var err error
  57. flag.StringVar(&optionsFile, "options-file", "", "Read options from file instead of stdin")
  58. flag.Parse()
  59. if optionsFile != "" {
  60. optionsBytes, err = ioutil.ReadFile(optionsFile)
  61. } else {
  62. optionsBytes, err = ioutil.ReadAll(os.Stdin)
  63. }
  64. if err != nil {
  65. panic(Fmt("Error reading input: %v", err))
  66. }
  67. options := binary.ReadJSON(&Options{}, optionsBytes, &err).(*Options)
  68. if err != nil {
  69. panic(Fmt("Error parsing input: %v", err))
  70. }
  71. barak.nonce = options.StartNonce
  72. barak.validators = options.Validators
  73. barak.rootDir = os.Getenv("BRKROOT")
  74. if barak.rootDir == "" {
  75. barak.rootDir = os.Getenv("HOME") + "/.barak"
  76. }
  77. err = EnsureDir(barak.rootDir)
  78. if err != nil {
  79. panic(Fmt("Error creating barak rootDir: %v", err))
  80. }
  81. barak.registries = options.Registries
  82. // Debug.
  83. fmt.Printf("Options: %v\n", options)
  84. fmt.Printf("Barak: %v\n", barak)
  85. // Start rpc server.
  86. mux := http.NewServeMux()
  87. mux.HandleFunc("/download", ServeFile)
  88. mux.HandleFunc("/register", Register)
  89. // TODO: mux.HandleFunc("/upload", UploadFile)
  90. rpc.RegisterRPCFuncs(mux, Routes)
  91. rpc.StartHTTPServer(options.ListenAddress, mux)
  92. // Register this barak with central listener
  93. for _, registry := range barak.registries {
  94. go func(registry string) {
  95. for {
  96. resp, err := http.Get(registry + "/register")
  97. if err != nil {
  98. fmt.Printf("Error registering to registry %v:\n %v\n", registry, err)
  99. time.Sleep(1 * time.Hour)
  100. continue
  101. }
  102. body, _ := ioutil.ReadAll(resp.Body)
  103. fmt.Printf("Successfully registered with registry %v\n %v\n", registry, string(body))
  104. return
  105. }
  106. }(registry)
  107. }
  108. // Write pid to file. This should be the last thing before TrapSignal.
  109. err = AtomicWriteFile(barak.rootDir+"/pidfile", []byte(Fmt("%v", barak.pid)))
  110. if err != nil {
  111. panic(Fmt("Error writing pidfile: %v", err))
  112. }
  113. TrapSignal(func() {
  114. fmt.Println("Barak shutting down")
  115. })
  116. }
  117. //------------------------------------------------------------------------------
  118. // RPC functions
  119. func Status() (*ResponseStatus, error) {
  120. barak.mtx.Lock()
  121. pid := barak.pid
  122. nonce := barak.nonce
  123. validators := barak.validators
  124. barak.mtx.Unlock()
  125. return &ResponseStatus{
  126. Pid: pid,
  127. Nonce: nonce,
  128. Validators: validators,
  129. }, nil
  130. }
  131. func Run(authCommand AuthCommand) (interface{}, error) {
  132. command, err := parseValidateCommand(authCommand)
  133. if err != nil {
  134. return nil, err
  135. }
  136. log.Info(Fmt("Run() received command %v:\n%v", reflect.TypeOf(command), command))
  137. // Issue command
  138. switch c := command.(type) {
  139. case CommandRunProcess:
  140. return RunProcess(c.Wait, c.Label, c.ExecPath, c.Args, c.Input)
  141. case CommandStopProcess:
  142. return StopProcess(c.Label, c.Kill)
  143. case CommandListProcesses:
  144. return ListProcesses()
  145. default:
  146. return nil, errors.New("Invalid endpoint for command")
  147. }
  148. }
  149. func parseValidateCommandStr(authCommandStr string) (Command, error) {
  150. var err error
  151. authCommand := binary.ReadJSON(AuthCommand{}, []byte(authCommandStr), &err).(AuthCommand)
  152. if err != nil {
  153. fmt.Printf("Failed to parse auth_command")
  154. return nil, errors.New("AuthCommand parse error")
  155. }
  156. return parseValidateCommand(authCommand)
  157. }
  158. func parseValidateCommand(authCommand AuthCommand) (Command, error) {
  159. commandJSONStr := authCommand.CommandJSONStr
  160. signatures := authCommand.Signatures
  161. // Validate commandJSONStr
  162. if !validate([]byte(commandJSONStr), barak.validators, signatures) {
  163. fmt.Printf("Failed validation attempt")
  164. return nil, errors.New("Validation error")
  165. }
  166. // Parse command
  167. var err error
  168. command := binary.ReadJSON(NoncedCommand{}, []byte(commandJSONStr), &err).(NoncedCommand)
  169. if err != nil {
  170. fmt.Printf("Failed to parse command")
  171. return nil, errors.New("Command parse error")
  172. }
  173. // Prevent replays
  174. if barak.nonce+1 != command.Nonce {
  175. return nil, errors.New("Replay error")
  176. } else {
  177. barak.nonce += 1
  178. }
  179. return command.Command, nil
  180. }
  181. //------------------------------------------------------------------------------
  182. // RPC base commands
  183. // WARNING Not validated, do not export to routes.
  184. func RunProcess(wait bool, label string, execPath string, args []string, input string) (*ResponseRunProcess, error) {
  185. barak.mtx.Lock()
  186. // First, see if there already is a process labeled 'label'
  187. existing := barak.processes[label]
  188. if existing != nil && existing.EndTime.IsZero() {
  189. barak.mtx.Unlock()
  190. return nil, fmt.Errorf("Process already exists: %v", label)
  191. }
  192. // Otherwise, create one.
  193. err := EnsureDir(barak.rootDir + "/outputs")
  194. if err != nil {
  195. return nil, fmt.Errorf("Failed to create outputs dir: %v", err)
  196. }
  197. outPath := Fmt("%v/outputs/%v_%v.out", barak.rootDir, label, time.Now().Format("2006_01_02_15_04_05_MST"))
  198. proc, err := pcm.Create(pcm.ProcessModeDaemon, label, execPath, args, input, outPath)
  199. if err == nil {
  200. barak.processes[label] = proc
  201. }
  202. barak.mtx.Unlock()
  203. if err != nil {
  204. return nil, err
  205. }
  206. if wait {
  207. <-proc.WaitCh
  208. output := pcm.ReadOutput(proc)
  209. fmt.Println("Read output", output)
  210. if proc.ExitState == nil {
  211. return &ResponseRunProcess{
  212. Success: true,
  213. Output: output,
  214. }, nil
  215. } else {
  216. return &ResponseRunProcess{
  217. Success: proc.ExitState.Success(), // Would be always false?
  218. Output: output,
  219. }, nil
  220. }
  221. } else {
  222. return &ResponseRunProcess{
  223. Success: true,
  224. Output: "",
  225. }, nil
  226. }
  227. }
  228. func StopProcess(label string, kill bool) (*ResponseStopProcess, error) {
  229. barak.mtx.Lock()
  230. proc := barak.processes[label]
  231. barak.mtx.Unlock()
  232. if proc == nil {
  233. return nil, fmt.Errorf("Process does not exist: %v", label)
  234. }
  235. err := pcm.Stop(proc, kill)
  236. return &ResponseStopProcess{}, err
  237. }
  238. func ListProcesses() (*ResponseListProcesses, error) {
  239. var procs = []*pcm.Process{}
  240. barak.mtx.Lock()
  241. fmt.Println("Processes: %v", barak.processes)
  242. for _, proc := range barak.processes {
  243. procs = append(procs, proc)
  244. }
  245. barak.mtx.Unlock()
  246. return &ResponseListProcesses{
  247. Processes: procs,
  248. }, nil
  249. }
  250. // Another barak instance registering its external
  251. // address to a remote barak.
  252. func Register(w http.ResponseWriter, req *http.Request) {
  253. registry, err := os.OpenFile(barak.rootDir+"/registry.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
  254. if err != nil {
  255. http.Error(w, "Could not open registry file. Please contact the administrator", 500)
  256. return
  257. }
  258. // TODO: Also check the X-FORWARDED-FOR or whatever it's called.
  259. registry.Write([]byte(Fmt("++ %v\n", req.RemoteAddr)))
  260. registry.Close()
  261. w.Header().Set("Content-Type", "application/json")
  262. w.WriteHeader(200)
  263. w.Write([]byte("Noted!"))
  264. }
  265. func ServeFile(w http.ResponseWriter, req *http.Request) {
  266. authCommandStr := req.FormValue("auth_command")
  267. command, err := parseValidateCommandStr(authCommandStr)
  268. if err != nil {
  269. http.Error(w, Fmt("Invalid command: %v", err), 400)
  270. }
  271. serveCommand, ok := command.(CommandServeFile)
  272. if !ok {
  273. http.Error(w, "Invalid command", 400)
  274. }
  275. path := serveCommand.Path
  276. if path == "" {
  277. http.Error(w, "Must specify path", 400)
  278. return
  279. }
  280. if path[0] == '.' {
  281. // local paths must be explicitly local, e.g. "./xyz"
  282. } else if path[0] != '/' {
  283. // If not an absolute path, then is label
  284. proc := barak.processes[path]
  285. path = proc.OutputPath
  286. }
  287. file, err := os.Open(path)
  288. if err != nil {
  289. http.Error(w, Fmt("Error opening file: %v. %v", path, err), 400)
  290. return
  291. }
  292. _, err = io.Copy(w, file)
  293. if err != nil {
  294. fmt.Fprintf(os.Stderr, Fmt("Error serving file: %v. %v", path, err))
  295. return
  296. }
  297. }