diff --git a/cmd/mistryd/server.go b/cmd/mistryd/server.go index c9c6945..ca5f04a 100644 --- a/cmd/mistryd/server.go +++ b/cmd/mistryd/server.go @@ -36,13 +36,13 @@ type Server struct { pq *ProjectQueue cfg *Config - br *broker.Broker + // web-view related // Queue used to track all open tailers by their id. Every tailer id // matches a job id. // The stored map type is [string]bool. tq *sync.Map - + br *broker.Broker fs http.FileSystem } @@ -81,7 +81,6 @@ func NewServer(cfg *Config, logger *log.Logger) (*Server, error) { s.pq = NewProjectQueue() s.br = broker.NewBroker(s.Log) s.tq = new(sync.Map) - go s.br.ListenForClients() return s, nil } @@ -136,16 +135,16 @@ func (s *Server) HandleNewJob(w http.ResponseWriter, r *http.Request) { // HandleIndex returns all available jobs. func (s *Server) HandleIndex(w http.ResponseWriter, r *http.Request) { + var jobs []Job + if r.Method != "GET" { http.Error(w, "Expected GET, got "+r.Method, http.StatusMethodNotAllowed) return } - var jobList []Job - projects, err := ioutil.ReadDir(s.cfg.BuildPath) if err != nil { - s.Log.Print(err) + s.Log.Print("cannot scan projects; ", err) w.WriteHeader(http.StatusInternalServerError) return } @@ -155,56 +154,53 @@ func (s *Server) HandleIndex(w http.ResponseWriter, r *http.Request) { readyPath := filepath.Join(s.cfg.BuildPath, p.Name(), "ready") pendingJobs, err := ioutil.ReadDir(pendingPath) if err != nil { - s.Log.Print(err) + s.Log.Printf("cannot scan pending jobs of project %s; %s", p.Name(), err) w.WriteHeader(http.StatusInternalServerError) return } readyJobs, err := ioutil.ReadDir(readyPath) if err != nil { - s.Log.Print(err) + s.Log.Printf("cannot scan ready jobs of project %s; %s", p.Name(), err) w.WriteHeader(http.StatusInternalServerError) return } for _, j := range pendingJobs { - buildLogPath := filepath.Join(pendingPath, j.Name(), BuildLogFname) - ji := Job{ + jobs = append(jobs, Job{ ID: j.Name(), Project: p.Name(), StartedAt: j.ModTime(), - Output: buildLogPath, - State: "pending"} - jobList = append(jobList, ji) + Output: filepath.Join(pendingPath, j.Name(), BuildLogFname), + State: "pending"}) } for _, j := range readyJobs { - buildLogPath := filepath.Join(readyPath, j.Name(), BuildLogFname) - ji := Job{ + jobs = append(jobs, Job{ ID: j.Name(), Project: p.Name(), StartedAt: j.ModTime(), - Output: buildLogPath, - State: "ready"} - jobList = append(jobList, ji) + Output: filepath.Join(readyPath, j.Name(), BuildLogFname), + State: "ready"}) } } - sort.Slice(jobList, func(i, j int) bool { - return jobList[i].StartedAt.Unix() > jobList[j].StartedAt.Unix() + sort.Slice(jobs, func(i, j int) bool { + return jobs[i].StartedAt.Unix() > jobs[j].StartedAt.Unix() }) - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - - resp, err := json.Marshal(jobList) + resp, err := json.Marshal(jobs) if err != nil { - s.Log.Print(err) + s.Log.Print("cannot marshal jobs '%#v'; %s", jobs, err) w.WriteHeader(http.StatusInternalServerError) return } + + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + _, err = w.Write(resp) if err != nil { - s.Log.Print(err) + s.Log.Print("cannot write response %s", err) w.WriteHeader(http.StatusInternalServerError) return } @@ -213,14 +209,21 @@ func (s *Server) HandleIndex(w http.ResponseWriter, r *http.Request) { // HandleShowJob receives requests for a job and produces the appropriate output // based on the content type of the request. func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) { + var buildLog []byte + var buildResult []byte + if r.Method != "GET" { http.Error(w, "Expected GET, got "+r.Method, http.StatusMethodNotAllowed) return } - path := strings.Split(r.URL.Path, "/") - project := path[2] - id := path[3] + parts := strings.Split(r.URL.Path, "/") + if len(parts) != 4 { + w.WriteHeader(http.StatusBadRequest) + return + } + project := parts[2] + id := parts[3] state, err := GetState(s.cfg.BuildPath, project, id) if err != nil { @@ -230,21 +233,18 @@ func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) { } jPath := filepath.Join(s.cfg.BuildPath, project, state, id) buildLogPath := filepath.Join(jPath, BuildLogFname) - buildResultFilePath := filepath.Join(jPath, BuildResultFname) - var rawLog []byte - var rawResult []byte + buildResultPath := filepath.Join(jPath, BuildResultFname) - // Decide whether to tail the log file or print it immediately, - // based on the job state. - if state != "pending" { - rawResult, err = ioutil.ReadFile(buildResultFilePath) + if state == "ready" { + buildResult, err = ioutil.ReadFile(buildResultPath) if err != nil { s.Log.Print(err) w.WriteHeader(http.StatusInternalServerError) return } } - rawLog, err = ioutil.ReadFile(buildLogPath) + + buildLog, err = ioutil.ReadFile(buildLogPath) if err != nil { s.Log.Print(err) w.WriteHeader(http.StatusInternalServerError) @@ -257,18 +257,18 @@ func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } - ji := Job{ - Output: string(rawResult), - Log: template.HTML(strings.Replace(string(rawLog), "\n", "
", -1)), + + j := Job{ + Output: string(buildResult), + Log: template.HTML(strings.Replace(string(buildLog), "\n", "
", -1)), ID: id, Project: project, State: state, StartedAt: jDir[0].ModTime(), } - ct := r.Header.Get("Content-type") - if ct == "application/json" { - jiData, err := json.Marshal(ji) + if r.Header.Get("Content-type") == "application/json" { + jData, err := json.Marshal(j) if err != nil { s.Log.Print(err) w.WriteHeader(http.StatusInternalServerError) @@ -276,7 +276,7 @@ func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) { } w.Header().Set("Content-Type", "application/json") - w.Write(jiData) + w.Write(jData) return } @@ -287,7 +287,8 @@ func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) { if !ok { // Create a channel to communicate the closure of all connections // for the job id to the spawned tailer goroutine. - if _, ok := s.br.CloseClientC[id]; !ok { + _, ok := s.br.CloseClientC[id] + if !ok { s.br.CloseClientC[id] = make(chan struct{}) } @@ -297,12 +298,13 @@ func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } + // Mark the id to the tailers' queue to identify that a // tail reader has been spawned. s.tq.Store(id, true) go func() { - s.Log.Printf("[tailer] Starting for: %s", id) + s.Log.Printf("[tailer] Starting for %s", id) scanner := bufio.NewScanner(tl) for scanner.Scan() { @@ -311,11 +313,13 @@ func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) { }() go func() { - tickChan := time.NewTicker(time.Second * 3).C + tick := time.NewTicker(time.Second * 3) + defer tick.Stop() + TAIL_CLOSE_LOOP: for { select { - case <-tickChan: + case <-tick.C: state, err := GetState(s.cfg.BuildPath, project, id) if err != nil { s.Log.Print(err) @@ -358,19 +362,23 @@ func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } - tmpl.Execute(w, ji) + tmpl.Execute(w, j) } -// HandleServerPush handles the server push logic. +// HandleServerPush emits build logs as Server-SentEvents (SSE). func (s *Server) HandleServerPush(w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { http.Error(w, "Expected GET, got "+r.Method, http.StatusMethodNotAllowed) return } - path := strings.Split(r.URL.Path, "/") - project := path[2] - id := path[3] + parts := strings.Split(r.URL.Path, "/") + if len(parts) != 4 { + w.WriteHeader(http.StatusBadRequest) + return + } + project := parts[2] + id := parts[3] state, err := GetState(s.cfg.BuildPath, project, id) if err != nil { @@ -392,36 +400,25 @@ func (s *Server) HandleServerPush(w http.ResponseWriter, r *http.Request) { return } - // Set the headers for browsers that support server sent events. w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") - // Each connection registers its own event channel with the - // broker's client connections registry s.br.Clients. client := &broker.Client{ID: id, EventC: make(chan []byte)} - - // Signal the broker that we have a new connection. s.br.NewClients <- client - // Remove this client from the map of connected clients when the - // handler exits. defer func() { s.br.ClosingClients <- client }() - // Listen to connection close and un-register the client. - notify := w.(http.CloseNotifier).CloseNotify() go func() { - <-notify + <-w.(http.CloseNotifier).CloseNotify() s.br.ClosingClients <- client }() for { - // Emit the message from the server. fmt.Fprintf(w, "data: %s\n\n", <-client.EventC) - // Send any buffered content to the client immediately. flusher.Flush() } } @@ -431,5 +428,6 @@ func (s *Server) HandleServerPush(w http.ResponseWriter, r *http.Request) { // non-nil error. func (s *Server) ListenAndServe() error { s.Log.Printf("Configuration: %#v", s.cfg) + go s.br.ListenForClients() return s.srv.ListenAndServe() }