Skip to content
This repository has been archived by the owner on Nov 7, 2019. It is now read-only.

Commit

Permalink
Merge pull request #4 from datatogether/task_pipeline
Browse files Browse the repository at this point in the history
Task pipeline
  • Loading branch information
flyingzumwalt authored Jul 12, 2017
2 parents eac45aa + 200a878 commit 06280bc
Show file tree
Hide file tree
Showing 133 changed files with 19,283 additions and 1,504 deletions.
50 changes: 50 additions & 0 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

117 changes: 84 additions & 33 deletions accept_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,46 @@ package main
import (
"encoding/json"
"fmt"
"github.com/datatogether/task-mgmt/defs/ipfs"
"github.com/datatogether/task-mgmt/taskdefs/ipfs"
"github.com/datatogether/task-mgmt/taskdefs/kiwix"
"github.com/datatogether/task-mgmt/tasks"
"github.com/streadway/amqp"
"time"
)

// taskdefs is a map of all possible task names to their respective "New" funcs
var taskdefs = map[string]tasks.NewTaskFunc{
"ipfs.add": ipfs.NewTaskAdd,
func configureTasks() {
tasks.RegisterTaskdef("ipfs.add", ipfs.NewTaskAdd)
tasks.RegisterTaskdef("kiwix.updateSources", kiwix.NewTaskUpdateSources)

// Must set api server url to make ipfs tasks work
ipfs.IpfsApiServerUrl = cfg.IpfsApiUrl
}

// start accepting tasks, if setup doesn't error, it returns a stop channel
// writing to stop will teardown the func and stop accepting tasks
// start accepting tasks from the queue, if setup doesn't error,
// it returns a stop channel writing to stop will teardown the
// func and stop accepting tasks
func acceptTasks() (stop chan bool, err error) {
if cfg.AmqpUrl == "" {
return nil, fmt.Errorf("no amqp url specified")
}

stop = make(chan bool)
log.Printf("connecting to: %s", cfg.AmqpUrl)

conn, err := amqp.Dial(cfg.AmqpUrl)
if err != nil {
return nil, fmt.Errorf("Failed to connect to RabbitMQ: %s", err.Error())
var conn *amqp.Connection
for i := 0; i <= 1000; i++ {
conn, err = amqp.Dial(cfg.AmqpUrl)
if err != nil {
log.Infof("Failed to connect to amqp server: %s", err.Error())
time.Sleep(time.Second)
continue
}
break
}

// if the connection is still nil after 1000 tries, time to bail
if conn == nil {
return nil, fmt.Errorf("Failed to connect to amqp server")
}

ch, err := conn.Channel()
Expand Down Expand Up @@ -54,40 +76,69 @@ func acceptTasks() (stop chan bool, err error) {
}

go func() {
for d := range msgs {
newTask := taskdefs[d.Type]
if newTask == nil {
log.Errorf("unknown task type: %s", d.Type)
d.Nack(false, false)
for msg := range msgs {
// tasks.Tas
task, err := tasks.TaskFromDelivery(store, msg)
if err != nil {
log.Errorf("task error: %s", err.Error())
msg.Nack(false, false)
continue
}

task := newTask()
if err := json.Unmarshal(d.Body, task); err != nil {
log.Errorf("error decoding task body: %s", err.Error())
d.Nack(false, false)
continue
if err := task.Do(store); err != nil {
log.Errorf("task error: %s", err.Error())
msg.Nack(false, false)
} else {
log.Infof("completed task: %s, %s", msg.MessageId, msg.Type)
msg.Ack(false)
}

pc := make(chan tasks.Progress, 10)
task.Do(pc)
for p := range pc {
if p.Error != nil {
log.Errorf("task error: %s", err.Error())
d.Nack(false, false)
break
}
if p.Done {
log.Infof("completed task: %s, %s", d.MessageId, d.Type)
d.Ack(false)
break
}
}
}
// TODO - figure out a way to bail out of the above loop
// if stop is ever published to
<-stop
ch.Close()
conn.Close()
}()

return stop, nil
}

// DoTask performs the designated task
func DoTask(msg amqp.Delivery) error {
task, err := tasks.NewTaskable(msg.Type)
if err != nil {
return fmt.Errorf("unknown task type: %s", msg.Type)
}

if err := json.Unmarshal(msg.Body, task); err != nil {
return fmt.Errorf("error decoding task body json: %s", err.Error())
}

// If the task supports the DatastoreTask interface,
// pass in our host db connection
if dsT, ok := task.(tasks.DatastoreTaskable); ok {
dsT.SetDatastore(store)
}

// created buffered progress updates channel
pc := make(chan tasks.Progress, 10)

// execute the task in a goroutine
go task.Do(pc)

for p := range pc {
// TODO - log progress and pipe out of this func
// so others can listen in for updates
// log.Printf("")

if p.Error != nil {
return p.Error
}
if p.Done {
return nil
}
}

return nil
}
34 changes: 19 additions & 15 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ const (
//
// configuration is read at startup and cannot be alterd without restarting the server.
type config struct {
// site title
Title string

// port to listen on, will be read from PORT env variable if present.
Port string

// root url for service
// root url
UrlRoot string

// port to listen on for RPC calls
RpcPort string

// url of postgres app db
PostgresDbUrl string

Expand All @@ -58,17 +58,19 @@ type config struct {
// list of email addresses that should get notifications
EmailNotificationRecipients []string

// TODO - depricate github config & identity info now that task-mgmt
// is a backend service

// url to kick off github oauth process
GithubLoginUrl string
// GithubLoginUrl string
// owner of github repo. required
GithubRepoOwner string
// GithubRepoOwner string
// name of github repo. required.
GithubRepoName string

// GithubRepoName string
// location of identity server
IdentityServerUrl string
// IdentityServerUrl string
// cookie to check for user credentials to forward to identity server.
UserCookieKey string
// UserCookieKey string

// CertbotResponse is only for doing manual SSL certificate generation via LetsEncrypt.
CertbotResponse string
Expand All @@ -95,13 +97,15 @@ func initConfig(mode string) (cfg *config, err error) {
}

err = requireConfigStrings(map[string]string{
"PORT": cfg.Port,
"POSTGRES_DB_URL": cfg.PostgresDbUrl,
"GITHUB_REPO_OWNER": cfg.GithubRepoOwner,
"GITHUB_REPO_NAME": cfg.GithubRepoName,
"IDENTITY_SERVER_URL": cfg.IdentityServerUrl,
"PORT": cfg.Port,
"POSTGRES_DB_URL": cfg.PostgresDbUrl,
})

// output to stdout in dev mode
if mode == DEVELOP_MODE {
log.Out = os.Stdout
}

return
}

Expand Down
46 changes: 46 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
version: '2'
services:
task-mgmt:
build: .
volumes:
- .:/go/src/github.com/datatogether/task-mgmt
- ./sql:/sql
ports:
- 8080:8080
networks:
- back-tier
depends_on:
- ipfs
- rabbitmq
- postgres
environment:
- PORT=8080
- TLS=false
- GOLANG_ENV=develop
- POSTGRES_DB_URL=postgres://postgres@postgres/postgres?sslmode=disable
- PUBLIC_KEY=nothing_yet
- POSTMARK_KEY=POSTMARK_API_TEST
- [email protected]
- GITHUB_LOGIN_URL=http://localhost:3100/oauth/github
- AMQP_URL=amqp://guest:guest@rabbitmq:5672/
- IPFS_API_URL=http://ipfs:5001/api/v0
- RPC_PORT=4400
ipfs:
image: "ipfs/go-ipfs:latest"
networks:
- back-tier
ports:
- 5001:5001
rabbitmq:
image: "library/rabbitmq:latest"
networks:
- back-tier
ports:
- 5672:5672
postgres:
image: "postgres:9.6-alpine"
networks:
- back-tier

networks:
back-tier:
Loading

0 comments on commit 06280bc

Please sign in to comment.