Skip to content
This repository has been archived by the owner on Nov 17, 2018. It is now read-only.

Add callbacks for sent task ack, sent task and reshape API as retrieving result from AsyncResult.get() (fix #38) #43

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jimhorng
Copy link
Contributor

It's kind of large commit, since the small commits are hard to resolve merge conflicts while creating this PR.

Give user 3 options to listen for callbacks for async send task operations

  1. After task sent (fix #38 )
  2. After task sent and ack-ed (fix #38 )
  3. To fit original celery behavior that task.apply_async() to get the AsyncResult first, then AsyncResult.get() to get actual task result in tornado asynchronous fashion

Usage

Calling Celery tasks(has return value) from Tornado RequestHandler: ::

from tornado import gen, web
import tcelery, tasks

tcelery.setup_nonblocking_producer()

class AsyncHandler(web.RequestHandler):
    @web.asynchronous
    def get(self):
        tasks.echo.apply_async(args=['Hello world!'], callback=self.on_async_result)

    def on_async_result(self, async_result):
        async_result.get(callback=self.on_actual_result)

    def on_actual_result(self, result):
        self.write(str(result))
        self.finish()

with generator-based interface: ::

class GenAsyncHandler(web.RequestHandler):
    @web.asynchronous
    @gen.coroutine
    def get(self):
        async_result = yield gen.Task(tasks.sleep.apply_async, args=[3])
        result = yield gen.Task(async_result.get)
        self.write(str(result))
        self.finish()

Calling Celery tasks(no return value) from Tornado RequestHandler: ::

@web.asynchronous
def get(self):
    tasks.echo.apply_async(args=['Hello world!'], callback=self.on_async_result)

def on_async_result(self, async_result):
    self.write("task sent") # ack-ed if BROKER_TRANSPORT_OPTIONS: {'confirm_publish': True}
    self.finish()

with generator-based interface: ::

@web.asynchronous
@gen.coroutine
def get(self):
    yield gen.Task(tasks.sleep.apply_async, args=[3])
    self.write("task sent") # ack-ed if BROKER_TRANSPORT_OPTIONS: {'confirm_publish': True}
    self.finish()

See updated README.rst for api usage details.

Functional quality

  • All function tests are passed, including amqp, redis backend
  • Works in connection pool where connections >= 2
  • Works in re-connect scenario, including features as wait for publish ack...etc.
  • If no backend is configured, behaves the same as original celery that uses DisabledBackend

add callbacks for sent task ack and sent task

Conflicts:
	tcelery/producer.py
@jimhorng jimhorng changed the title Add callbacks for publish acked, published and reshape API as retrieving result from AsyncResult.get() (fix #38) Add callbacks for sent task ack, sent task and reshape API as retrieving result from AsyncResult.get() (fix #38) Oct 22, 2014
@jimhorng
Copy link
Contributor Author

@mher any review comment on this PR? thanks :)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant