-
Notifications
You must be signed in to change notification settings - Fork 23
/
flask_sse.py
66 lines (51 loc) · 1.86 KB
/
flask_sse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from sse import Sse as PySse
from redis import StrictRedis
from redis import ConnectionPool as RedisConnectionPool
from flask import json, current_app, Blueprint, request
class ConnectionPool(object):
pool = {}
@classmethod
def key(cls, *args, **kwargs):
return ':'.join(args) + \
':'.join('%s=%s' % (k, v) for k, v in kwargs.items())
@classmethod
def lookup_pool(cls, *args, **kwargs):
key = cls.key(*args, **kwargs)
if key not in cls.pool:
cls.pool[key] = RedisConnectionPool(*args, **kwargs)
return cls.pool[key]
@classmethod
def get_connection(cls):
pool = cls.lookup_pool(
host=current_app.config.get('SSE_REDIS_HOST', 'localhost'),
port=current_app.config.get('SSE_REDIS_PORT', 6379),
db=current_app.config.get('SSE_REDIS_DB', 0),
)
return StrictRedis(connection_pool=pool)
class SseStream(object):
def __init__(self, conn, channel):
self.pubsub = conn.pubsub()
self.pubsub.subscribe(channel)
def __iter__(self):
sse = PySse()
for data in sse:
yield data.encode('u8')
for message in self.pubsub.listen():
if message['type'] == 'message':
event, data = json.loads(message['data'])
sse.add_message(event, data)
for data in sse:
yield data.encode('u8')
sse = Blueprint('sse', __name__)
@sse.route('')
def stream():
conn = ConnectionPool.get_connection()
channel = request.args.get('channel', 'sse')
return current_app.response_class(
SseStream(conn, channel),
direct_passthrough=True,
mimetype='text/event-stream',
)
def send_event(event_name, data, channel='sse'):
conn = ConnectionPool.get_connection()
conn.publish(channel, json.dumps([event_name, data]))