Skip to content
This repository has been archived by the owner on Nov 17, 2018. It is now read-only.

fix: If the task is completed before subscription, we would not get … #59

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 52 additions & 13 deletions tcelery/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from tornado import gen
from tornadoredis import Client
from tornadoredis import ConnectionPool
from tornadoredis.exceptions import ResponseError
from tornadoredis.pubsub import BaseSubscriber

Expand Down Expand Up @@ -44,24 +45,39 @@ def _consume_bulk(self, tail, callback=None):
class RedisConsumer(object):
def __init__(self, producer):
self.producer = producer
backend = producer.app.backend
self.client = RedisClient(host=backend.connparams['host'],
port=backend.connparams['port'],
password=backend.connparams['password'],
selected_db=backend.connparams['db'],
io_loop=producer.conn_pool.io_loop)
self.client.connect()
self.subscriber = CelerySubscriber(self.client)
self.client = self.create_redis_client()
self.subscriber = CelerySubscriber(self.create_redis_client())

def wait_for(self, task_id, callback, expires=None, persistent=None):
key = self.producer.app.backend.get_key_for_task(task_id)
key = self.backend.get_key_for_task(task_id)
current_subscribed = [False]

def _set_subscribed(subscribed):
current_subscribed[0] = subscribed

def _on_timeout():
_set_subscribed(True)
self.on_timeout(key)

# Expiry time of the task should be used rather than the result ? ? ?
if expires:
timeout = self.producer.conn_pool.io_loop.add_timeout(
timedelta(microseconds=expires), self.on_timeout, key)
timeout = self.io_loop.add_timeout(timedelta(milliseconds=expires), _on_timeout)
else:
timeout = None
self.subscriber.subscribe(
key, partial(self.on_result, key, callback, timeout))
current_subscriber = partial(self.on_result, key, callback, timeout)
self.io_loop.add_future(gen.Task(self.subscriber.subscribe, key, current_subscriber),
lambda future: _set_subscribed(future.result()))

# If the task is completed before subscription,
# we would not get the result.So we try to check this case
# until the subscription is completed.
def _check_subscribed(future):
result = future.result()
if result:
current_subscriber(result)
elif not current_subscribed[0]:
self.io_loop.add_future(gen.Task(self.client.get, key), _check_subscribed)
self.io_loop.add_future(gen.Task(self.client.get, key), _check_subscribed)

def on_result(self, key, callback, timeout, result):
if timeout:
Expand All @@ -71,3 +87,26 @@ def on_result(self, key, callback, timeout, result):

def on_timeout(self, key):
self.subscriber.unsubscribe_channel(key)

def create_redis_client(self):
return RedisClient(password=self.backend.connparams['password'],
selected_db=self.backend.connparams['db'],
connection_pool=self.connection_pool,
io_loop=self.io_loop)

@property
def connection_pool(self):
if not hasattr(RedisConsumer, '_connection_pool'):
self._connection_pool = ConnectionPool(
host=self.backend.connparams['host'],
port=self.backend.connparams['port'],
io_loop=self.io_loop)
return self._connection_pool

@property
def io_loop(self):
return self.producer.conn_pool.io_loop

@property
def backend(self):
return self.producer.app.backend