Skip to content
This repository has been archived by the owner on Nov 7, 2019. It is now read-only.

Task pipeline #4

Merged
merged 13 commits into from
Jul 12, 2017
89 changes: 61 additions & 28 deletions accept_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@ package main
import (
"encoding/json"
"fmt"
"github.com/datatogether/task-mgmt/defs/ipfs"
"github.com/datatogether/task-mgmt/taskdefs/ipfs"
"github.com/datatogether/task-mgmt/taskdefs/kiwix"
"github.com/datatogether/task-mgmt/tasks"
"github.com/streadway/amqp"
)

// taskdefs is a map of all possible task names to their respective "New" funcs
var taskdefs = map[string]tasks.NewTaskFunc{
"ipfs.add": ipfs.NewTaskAdd,
"ipfs.add": ipfs.NewTaskAdd,
"kiwix.updateSources": kiwix.NewTaskUpdateSources,
}

// start accepting tasks, if setup doesn't error, it returns a stop channel
// writing to stop will teardown the func and stop accepting tasks
// start accepting tasks from the queue, if setup doesn't error,
// it returns a stop channel writing to stop will teardown the
// func and stop accepting tasks
func acceptTasks() (stop chan bool, err error) {
stop = make(chan bool)

Expand Down Expand Up @@ -54,40 +57,70 @@ func acceptTasks() (stop chan bool, err error) {
}

go func() {
for d := range msgs {
newTask := taskdefs[d.Type]
if newTask == nil {
log.Errorf("unknown task type: %s", d.Type)
d.Nack(false, false)
for msg := range msgs {
// tasks.Tas
task, err := tasks.TaskFromDelivery(store, msg)
if err != nil {
log.Errorf("task error: %s", err.Error())
msg.Nack(false, false)
continue
}

task := newTask()
if err := json.Unmarshal(d.Body, task); err != nil {
log.Errorf("error decoding task body: %s", err.Error())
d.Nack(false, false)
continue
if err := task.Do(store); err != nil {
log.Errorf("task error: %s", err.Error())
msg.Nack(false, false)
} else {
log.Infof("completed task: %s, %s", msg.MessageId, msg.Type)
msg.Ack(false)
}

pc := make(chan tasks.Progress, 10)
task.Do(pc)
for p := range pc {
if p.Error != nil {
log.Errorf("task error: %s", err.Error())
d.Nack(false, false)
break
}
if p.Done {
log.Infof("completed task: %s, %s", d.MessageId, d.Type)
d.Ack(false)
break
}
}
}
// TODO - figure out a way to bail out of the above loop
// if stop is ever published to
<-stop
ch.Close()
conn.Close()
}()

return stop, nil
}

// DoTask performs the designated task
func DoTask(msg amqp.Delivery) error {
newTask := taskdefs[msg.Type]
if newTask == nil {
return fmt.Errorf("unknown task type: %s", msg.Type)
}

task := newTask()
if err := json.Unmarshal(msg.Body, task); err != nil {
return fmt.Errorf("error decoding task body json: %s", err.Error())
}

// If the task supports the DatastoreTask interface,
// pass in our host db connection
if dsT, ok := task.(tasks.DatastoreTaskable); ok {
dsT.SetDatastore(store)
}

// created buffered progress updates channel
pc := make(chan tasks.Progress, 10)

// execute the task in a goroutine
go task.Do(pc)

for p := range pc {
// TODO - log progress and pipe out of this func
// so others can listen in for updates
// log.Printf("")

if p.Error != nil {
return p.Error
}
if p.Done {
return nil
}
}

return nil
}
3 changes: 3 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type config struct {
// root url for service
UrlRoot string

// port to listen on for RPC calls
RpcPort string

// url of postgres app db
PostgresDbUrl string

Expand Down
147 changes: 74 additions & 73 deletions email.go
Original file line number Diff line number Diff line change
@@ -1,87 +1,88 @@
// transactional email handled by postmark
package main

import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
)
// import (
// "encoding/json"
// "fmt"
// "github.com/datatogether/task-mgmt/tasks"
// "io"
// "io/ioutil"
// "net/http"
// "strings"
// )

// SendTaskRequestEmail sends an email to cfg.EmailNotificationRecipients
// with details for a newly requested task
func SendTaskRequestEmail(t *Task) error {
if len(cfg.EmailNotificationRecipients) == 0 {
return fmt.Errorf("no recipients are set to send email to")
}
// // SendTaskRequestEmail sends an email to cfg.EmailNotificationRecipients
// // with details for a newly requested task
// func SendTaskRequestEmail(t *tasks.Task) error {
// if len(cfg.EmailNotificationRecipients) == 0 {
// return fmt.Errorf("no recipients are set to send email to")
// }

body := fmt.Sprintf(`{
"From" : "[email protected]",
"To" : "%s",
"Tag" : "feedback",
"Subject" : "Injest Request: %s",
"TextBody" : "requested: %s\nsource url: %s\n"
}`,
strings.Join(cfg.EmailNotificationRecipients, ","),
t.Title,
t.Request,
t.SourceUrl,
)
// body := fmt.Sprintf(`{
// "From" : "[email protected]",
// "To" : "%s",
// "Tag" : "feedback",
// "Subject" : "Injest Request: %s",
// "TextBody" : "requested: %s\nsource url: %s\n"
// }`,
// strings.Join(cfg.EmailNotificationRecipients, ","),
// t.Title,
// t.Request,
// t.SourceUrl,
// )

return sendEmail(strings.NewReader(body))
}
// return sendEmail(strings.NewReader(body))
// }

// SendTaskRequestEmail sends an email to cfg.EmailNotificationRecipients
// notifying them of a cancelled request
func SendTaskCancelEmail(t *Task) error {
if len(cfg.EmailNotificationRecipients) == 0 {
return fmt.Errorf("no recipients are set to send email to")
}
// // SendTaskRequestEmail sends an email to cfg.EmailNotificationRecipients
// // notifying them of a cancelled request
// func SendTaskCancelEmail(t *tasks.Task) error {
// if len(cfg.EmailNotificationRecipients) == 0 {
// return fmt.Errorf("no recipients are set to send email to")
// }

body := fmt.Sprintf(`{
"From" : "[email protected]",
"To" : "%s",
"Tag" : "feedback",
"Subject" : "Request Cancelled: %s",
"TextBody" : "requested: %s\nsource url: %s\ncancelled: %s"
}`,
strings.Join(cfg.EmailNotificationRecipients, ","),
t.Title,
t.Request,
t.SourceUrl,
t.Fail,
)
// body := fmt.Sprintf(`{
// "From" : "[email protected]",
// "To" : "%s",
// "Tag" : "feedback",
// "Subject" : "Request Cancelled: %s",
// "TextBody" : "requested: %s\nsource url: %s\ncancelled: %s"
// }`,
// strings.Join(cfg.EmailNotificationRecipients, ","),
// t.Title,
// t.Request,
// t.SourceUrl,
// t.Fail,
// )

return sendEmail(strings.NewReader(body))
}
// return sendEmail(strings.NewReader(body))
// }

// send an email using postmark transactional email service
// postmarkapp.com
func sendEmail(jsonBody io.Reader) error {
if cfg.PostmarkKey == "" {
return fmt.Errorf("missing postmark key for sending email")
}
// // send an email using postmark transactional email service
// // postmarkapp.com
// func sendEmail(jsonBody io.Reader) error {
// if cfg.PostmarkKey == "" {
// return fmt.Errorf("missing postmark key for sending email")
// }

url := "https://api.postmarkapp.com/email/"
// url := "https://api.postmarkapp.com/email/"

req, err := http.NewRequest("POST", url, jsonBody)
if err != nil {
return err
}
req.Header.Add("X-Postmark-Server-Token", cfg.PostmarkKey)
req.Header.Add("Accept", "application/json")
req.Header.Add("Content-Type", "application/json")
req.Body = ioutil.NopCloser(jsonBody)
// req, err := http.NewRequest("POST", url, jsonBody)
// if err != nil {
// return err
// }
// req.Header.Add("X-Postmark-Server-Token", cfg.PostmarkKey)
// req.Header.Add("Accept", "application/json")
// req.Header.Add("Content-Type", "application/json")
// req.Body = ioutil.NopCloser(jsonBody)

res, err := http.DefaultClient.Do(req)
// if the server responds with an error, process & log out
if res.StatusCode == 422 {
responseBody := map[string]interface{}{}
json.NewDecoder(res.Body).Decode(&responseBody)
log.Info(responseBody)
}
// res, err := http.DefaultClient.Do(req)
// // if the server responds with an error, process & log out
// if res.StatusCode == 422 {
// responseBody := map[string]interface{}{}
// json.NewDecoder(res.Body).Decode(&responseBody)
// log.Info(responseBody)
// }

return err
}
// return err
// }
Loading