You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

254 lines
5.9 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package main
  2. import (
  3. "errors"
  4. "fmt"
  5. "io/ioutil"
  6. "net"
  7. "net/http"
  8. "os"
  9. "sync"
  10. "time"
  11. "github.com/tendermint/tendermint/binary"
  12. . "github.com/tendermint/tendermint/cmd/barak/types"
  13. . "github.com/tendermint/tendermint/common"
  14. pcm "github.com/tendermint/tendermint/process"
  15. "github.com/tendermint/tendermint/rpc/server"
  16. )
  17. type BarakOptions struct {
  18. Validators []Validator
  19. ListenAddress string
  20. StartNonce uint64
  21. Registries []string
  22. }
  23. // Read options from a file, or stdin if optionsFile is ""
  24. func ReadBarakOptions(optFile string) *BarakOptions {
  25. var optBytes []byte
  26. var err error
  27. if optFile != "" {
  28. optBytes, err = ioutil.ReadFile(optFile)
  29. } else {
  30. optBytes, err = ioutil.ReadAll(os.Stdin)
  31. }
  32. if err != nil {
  33. panic(Fmt("Error reading input: %v", err))
  34. }
  35. opt := binary.ReadJSON(&BarakOptions{}, optBytes, &err).(*BarakOptions)
  36. if err != nil {
  37. panic(Fmt("Error parsing input: %v", err))
  38. }
  39. return opt
  40. }
  41. func ensureRootDir() (rootDir string) {
  42. rootDir = os.Getenv("BRKROOT")
  43. if rootDir == "" {
  44. rootDir = os.Getenv("HOME") + "/.barak"
  45. }
  46. err := EnsureDir(rootDir)
  47. if err != nil {
  48. panic(Fmt("Error creating barak rootDir: %v", err))
  49. }
  50. return
  51. }
  52. func NewBarakFromOptions(opt *BarakOptions) *Barak {
  53. rootDir := ensureRootDir()
  54. barak := NewBarak(rootDir, opt.StartNonce, opt.Validators)
  55. for _, registry := range opt.Registries {
  56. barak.AddRegistry(registry)
  57. }
  58. barak.OpenListener(opt.ListenAddress)
  59. // Debug.
  60. fmt.Printf("Options: %v\n", opt)
  61. fmt.Printf("Barak: %v\n", barak)
  62. return barak
  63. }
  64. //--------------------------------------------------------------------------------
  65. type Barak struct {
  66. mtx sync.Mutex
  67. pid int
  68. nonce uint64
  69. processes map[string]*pcm.Process
  70. validators []Validator
  71. listeners []net.Listener
  72. rootDir string
  73. registries []string
  74. }
  75. func NewBarak(rootDir string, nonce uint64, validators []Validator) *Barak {
  76. return &Barak{
  77. pid: os.Getpid(),
  78. nonce: nonce,
  79. processes: make(map[string]*pcm.Process),
  80. validators: validators,
  81. listeners: nil,
  82. rootDir: rootDir,
  83. registries: nil,
  84. }
  85. }
  86. func (brk *Barak) RootDir() string {
  87. brk.mtx.Lock()
  88. defer brk.mtx.Unlock()
  89. return brk.rootDir
  90. }
  91. func (brk *Barak) ListProcesses() []*pcm.Process {
  92. brk.mtx.Lock()
  93. defer brk.mtx.Unlock()
  94. processes := []*pcm.Process{}
  95. for _, process := range brk.processes {
  96. processes = append(processes, process)
  97. }
  98. return processes
  99. }
  100. func (brk *Barak) GetProcess(label string) *pcm.Process {
  101. brk.mtx.Lock()
  102. defer brk.mtx.Unlock()
  103. return brk.processes[label]
  104. }
  105. func (brk *Barak) AddProcess(label string, process *pcm.Process) error {
  106. brk.mtx.Lock()
  107. defer brk.mtx.Unlock()
  108. existing := brk.processes[label]
  109. if existing != nil && existing.EndTime.IsZero() {
  110. return fmt.Errorf("Process already exists: %v", label)
  111. }
  112. brk.processes[label] = process
  113. return nil
  114. }
  115. func (brk *Barak) StopProcess(label string, kill bool) error {
  116. brk.mtx.Lock()
  117. proc := brk.processes[label]
  118. brk.mtx.Unlock()
  119. if proc == nil {
  120. return fmt.Errorf("Process does not exist: %v", label)
  121. }
  122. err := pcm.Stop(proc, kill)
  123. return err
  124. }
  125. func (brk *Barak) ListValidators() []Validator {
  126. brk.mtx.Lock()
  127. defer brk.mtx.Unlock()
  128. return brk.validators
  129. }
  130. func (brk *Barak) ListListeners() []net.Listener {
  131. brk.mtx.Lock()
  132. defer brk.mtx.Unlock()
  133. return brk.listeners
  134. }
  135. func (brk *Barak) OpenListener(addr string) (net.Listener, error) {
  136. brk.mtx.Lock()
  137. defer brk.mtx.Unlock()
  138. // Start rpc server.
  139. mux := http.NewServeMux()
  140. mux.HandleFunc("/download", ServeFileHandler)
  141. mux.HandleFunc("/register", RegisterHandler)
  142. // TODO: mux.HandleFunc("/upload", UploadFile)
  143. rpcserver.RegisterRPCFuncs(mux, Routes)
  144. listener, err := rpcserver.StartHTTPServer(addr, mux)
  145. if err != nil {
  146. return nil, err
  147. }
  148. brk.listeners = append(brk.listeners, listener)
  149. return listener, nil
  150. }
  151. func (brk *Barak) CloseListener(addr string) {
  152. brk.mtx.Lock()
  153. defer brk.mtx.Unlock()
  154. filtered := []net.Listener{}
  155. for _, listener := range brk.listeners {
  156. if listener.Addr().String() == addr {
  157. err := listener.Close()
  158. if err != nil {
  159. fmt.Printf("Error closing listener: %v\n", err)
  160. }
  161. continue
  162. }
  163. filtered = append(filtered, listener)
  164. }
  165. brk.listeners = filtered
  166. }
  167. func (brk *Barak) GetRegistries() []string {
  168. brk.mtx.Lock()
  169. defer brk.mtx.Unlock()
  170. return brk.registries
  171. }
  172. func (brk *Barak) AddRegistry(registry string) {
  173. brk.mtx.Lock()
  174. defer brk.mtx.Unlock()
  175. brk.registries = append(brk.registries, registry)
  176. }
  177. func (brk *Barak) RemoveRegistry(registry string) {
  178. brk.mtx.Lock()
  179. defer brk.mtx.Unlock()
  180. filtered := []string{}
  181. for _, reg := range brk.registries {
  182. if registry == reg {
  183. continue
  184. }
  185. filtered = append(filtered, reg)
  186. }
  187. brk.registries = filtered
  188. }
  189. func (brk *Barak) StartRegisterRoutine() {
  190. // Register this barak with central listener
  191. go func() {
  192. // Workaround around issues when registries register on themselves upon startup.
  193. time.Sleep(3 * time.Second)
  194. for {
  195. // Every hour, register with the registries.
  196. for _, registry := range brk.registries {
  197. resp, err := http.Get(registry + "/register")
  198. if err != nil {
  199. fmt.Printf("Error registering to registry %v:\n %v\n", registry, err)
  200. } else if resp.StatusCode != 200 {
  201. body, _ := ioutil.ReadAll(resp.Body)
  202. fmt.Printf("Error registering to registry %v:\n %v\n", registry, string(body))
  203. } else {
  204. body, _ := ioutil.ReadAll(resp.Body)
  205. fmt.Printf("Successfully registered with registry %v\n %v\n", registry, string(body))
  206. }
  207. }
  208. time.Sleep(1 * time.Hour)
  209. }
  210. }()
  211. }
  212. // Write pid to file.
  213. func (brk *Barak) WritePidFile() {
  214. err := WriteFileAtomic(brk.rootDir+"/pidfile", []byte(Fmt("%v", brk.pid)))
  215. if err != nil {
  216. panic(Fmt("Error writing pidfile: %v", err))
  217. }
  218. }
  219. func (brk *Barak) CheckIncrNonce(newNonce uint64) error {
  220. brk.mtx.Lock()
  221. defer brk.mtx.Unlock()
  222. if brk.nonce+1 != newNonce {
  223. return errors.New("Replay error")
  224. }
  225. brk.nonce += 1
  226. return nil
  227. }