Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
Various field renames
Browse files Browse the repository at this point in the history
  • Loading branch information
agis committed Apr 19, 2018
1 parent d2aee8b commit e86dc70
Showing 1 changed file with 67 additions and 65 deletions.
132 changes: 67 additions & 65 deletions cmd/mistryd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ type Server struct {
pq *ProjectQueue
cfg *Config

br *broker.Broker
// web-view related

// Queue used to track all open tailers by their id. Every tailer id
// matches a job id.
// The stored map type is [string]bool.
tq *sync.Map

br *broker.Broker
fs http.FileSystem
}

Expand Down Expand Up @@ -81,7 +81,6 @@ func NewServer(cfg *Config, logger *log.Logger) (*Server, error) {
s.pq = NewProjectQueue()
s.br = broker.NewBroker(s.Log)
s.tq = new(sync.Map)
go s.br.ListenForClients()
return s, nil
}

Expand Down Expand Up @@ -136,16 +135,16 @@ func (s *Server) HandleNewJob(w http.ResponseWriter, r *http.Request) {

// HandleIndex returns all available jobs.
func (s *Server) HandleIndex(w http.ResponseWriter, r *http.Request) {
var jobs []Job

if r.Method != "GET" {
http.Error(w, "Expected GET, got "+r.Method, http.StatusMethodNotAllowed)
return
}

var jobList []Job

projects, err := ioutil.ReadDir(s.cfg.BuildPath)
if err != nil {
s.Log.Print(err)
s.Log.Print("cannot scan projects; ", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -155,56 +154,53 @@ func (s *Server) HandleIndex(w http.ResponseWriter, r *http.Request) {
readyPath := filepath.Join(s.cfg.BuildPath, p.Name(), "ready")
pendingJobs, err := ioutil.ReadDir(pendingPath)
if err != nil {
s.Log.Print(err)
s.Log.Printf("cannot scan pending jobs of project %s; %s", p.Name(), err)
w.WriteHeader(http.StatusInternalServerError)
return
}
readyJobs, err := ioutil.ReadDir(readyPath)
if err != nil {
s.Log.Print(err)
s.Log.Printf("cannot scan ready jobs of project %s; %s", p.Name(), err)
w.WriteHeader(http.StatusInternalServerError)
return
}

for _, j := range pendingJobs {
buildLogPath := filepath.Join(pendingPath, j.Name(), BuildLogFname)
ji := Job{
jobs = append(jobs, Job{
ID: j.Name(),
Project: p.Name(),
StartedAt: j.ModTime(),
Output: buildLogPath,
State: "pending"}
jobList = append(jobList, ji)
Output: filepath.Join(pendingPath, j.Name(), BuildLogFname),
State: "pending"})
}

for _, j := range readyJobs {
buildLogPath := filepath.Join(readyPath, j.Name(), BuildLogFname)
ji := Job{
jobs = append(jobs, Job{
ID: j.Name(),
Project: p.Name(),
StartedAt: j.ModTime(),
Output: buildLogPath,
State: "ready"}
jobList = append(jobList, ji)
Output: filepath.Join(readyPath, j.Name(), BuildLogFname),
State: "ready"})
}
}

sort.Slice(jobList, func(i, j int) bool {
return jobList[i].StartedAt.Unix() > jobList[j].StartedAt.Unix()
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].StartedAt.Unix() > jobs[j].StartedAt.Unix()
})

w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")

resp, err := json.Marshal(jobList)
resp, err := json.Marshal(jobs)
if err != nil {
s.Log.Print(err)
s.Log.Print("cannot marshal jobs '%#v'; %s", jobs, err)
w.WriteHeader(http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")

_, err = w.Write(resp)
if err != nil {
s.Log.Print(err)
s.Log.Print("cannot write response %s", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -213,14 +209,21 @@ func (s *Server) HandleIndex(w http.ResponseWriter, r *http.Request) {
// HandleShowJob receives requests for a job and produces the appropriate output
// based on the content type of the request.
func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) {
var buildLog []byte
var buildResult []byte

if r.Method != "GET" {
http.Error(w, "Expected GET, got "+r.Method, http.StatusMethodNotAllowed)
return
}

path := strings.Split(r.URL.Path, "/")
project := path[2]
id := path[3]
parts := strings.Split(r.URL.Path, "/")
if len(parts) != 4 {
w.WriteHeader(http.StatusBadRequest)
return
}
project := parts[2]
id := parts[3]

state, err := GetState(s.cfg.BuildPath, project, id)
if err != nil {
Expand All @@ -230,21 +233,18 @@ func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) {
}
jPath := filepath.Join(s.cfg.BuildPath, project, state, id)
buildLogPath := filepath.Join(jPath, BuildLogFname)
buildResultFilePath := filepath.Join(jPath, BuildResultFname)
var rawLog []byte
var rawResult []byte
buildResultPath := filepath.Join(jPath, BuildResultFname)

// Decide whether to tail the log file or print it immediately,
// based on the job state.
if state != "pending" {
rawResult, err = ioutil.ReadFile(buildResultFilePath)
if state == "ready" {
buildResult, err = ioutil.ReadFile(buildResultPath)
if err != nil {
s.Log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
rawLog, err = ioutil.ReadFile(buildLogPath)

buildLog, err = ioutil.ReadFile(buildLogPath)
if err != nil {
s.Log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -257,26 +257,26 @@ func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
return
}
ji := Job{
Output: string(rawResult),
Log: template.HTML(strings.Replace(string(rawLog), "\n", "<br />", -1)),

j := Job{
Output: string(buildResult),
Log: template.HTML(strings.Replace(string(buildLog), "\n", "<br />", -1)),
ID: id,
Project: project,
State: state,
StartedAt: jDir[0].ModTime(),
}

ct := r.Header.Get("Content-type")
if ct == "application/json" {
jiData, err := json.Marshal(ji)
if r.Header.Get("Content-type") == "application/json" {
jData, err := json.Marshal(j)
if err != nil {
s.Log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
w.Write(jiData)
w.Write(jData)
return
}

Expand All @@ -287,7 +287,8 @@ func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) {
if !ok {
// Create a channel to communicate the closure of all connections
// for the job id to the spawned tailer goroutine.
if _, ok := s.br.CloseClientC[id]; !ok {
_, ok := s.br.CloseClientC[id]
if !ok {
s.br.CloseClientC[id] = make(chan struct{})
}

Expand All @@ -297,12 +298,17 @@ func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
return
}
err = tl.Close()
if err != nil {
panic(err)
}

// Mark the id to the tailers' queue to identify that a
// tail reader has been spawned.
s.tq.Store(id, true)

go func() {
s.Log.Printf("[tailer] Starting for: %s", id)
s.Log.Printf("[tailer] Starting for %s", id)

scanner := bufio.NewScanner(tl)
for scanner.Scan() {
Expand All @@ -311,11 +317,13 @@ func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) {
}()

go func() {
tickChan := time.NewTicker(time.Second * 3).C
tick := time.NewTicker(time.Second * 3)
defer tick.Stop()

TAIL_CLOSE_LOOP:
for {
select {
case <-tickChan:
case <-tick.C:
state, err := GetState(s.cfg.BuildPath, project, id)
if err != nil {
s.Log.Print(err)
Expand Down Expand Up @@ -358,19 +366,23 @@ func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
return
}
tmpl.Execute(w, ji)
tmpl.Execute(w, j)
}

// HandleServerPush handles the server push logic.
// HandleServerPush emits build logs as Server-SentEvents (SSE).
func (s *Server) HandleServerPush(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "Expected GET, got "+r.Method, http.StatusMethodNotAllowed)
return
}

path := strings.Split(r.URL.Path, "/")
project := path[2]
id := path[3]
parts := strings.Split(r.URL.Path, "/")
if len(parts) != 4 {
w.WriteHeader(http.StatusBadRequest)
return
}
project := parts[2]
id := parts[3]

state, err := GetState(s.cfg.BuildPath, project, id)
if err != nil {
Expand All @@ -392,36 +404,25 @@ func (s *Server) HandleServerPush(w http.ResponseWriter, r *http.Request) {
return
}

// Set the headers for browsers that support server sent events.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")

// Each connection registers its own event channel with the
// broker's client connections registry s.br.Clients.
client := &broker.Client{ID: id, EventC: make(chan []byte)}

// Signal the broker that we have a new connection.
s.br.NewClients <- client

// Remove this client from the map of connected clients when the
// handler exits.
defer func() {
s.br.ClosingClients <- client
}()

// Listen to connection close and un-register the client.
notify := w.(http.CloseNotifier).CloseNotify()
go func() {
<-notify
<-w.(http.CloseNotifier).CloseNotify()
s.br.ClosingClients <- client
}()

for {
// Emit the message from the server.
fmt.Fprintf(w, "data: %s\n\n", <-client.EventC)
// Send any buffered content to the client immediately.
flusher.Flush()
}
}
Expand All @@ -431,5 +432,6 @@ func (s *Server) HandleServerPush(w http.ResponseWriter, r *http.Request) {
// non-nil error.
func (s *Server) ListenAndServe() error {
s.Log.Printf("Configuration: %#v", s.cfg)
go s.br.ListenForClients()
return s.srv.ListenAndServe()
}

0 comments on commit e86dc70

Please sign in to comment.