You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Has someone used this in peoduction scenarios? I am stuck and unable to pull messages using the async streaming client and any help would be benefecial.
Is wrapping the standard streaming pull in an asyncio executor gonna give me the same behaviour as below client?
fromgoogleimportpubsub_v1subscriber=pubsub_v1.SubscriberAsyncClient() #skipping the class code but this is the Subscriber clientasyncdeffetch_and_decode_msgs(self):
try:
print(f"Listening for messages on {self.subscription_path}...")
request=pubsub_v1.StreamingPullRequest(
subscription=self.subscription_path,
)
# Not sure if there is any other way to do thisasyncdefrequest_generator():
yieldrequestprint(f"stream: {request}"). # Code gets blocked herestream=awaitself.subscriber.streaming_pull(requests=request_generator())
print(f"stream: {stream}")
# Handle the responseasyncforresponseinstream:
forreceived_messageinresponse.received_messages:
print("Received message: ", received_message.message.data.decode('utf-8'))
exceptExceptionase:
raisee
The text was updated successfully, but these errors were encountered:
from google import pubsub_v1
class AsyncStreamingRequestIterator():
def __init__(self, initial_request):
self.request = initial_request
def __aiter__(self):
return self
async def __anext__(self):
# Send First Request
if self.request:
return_value = self.request
self.request = None
return return_value
await asyncio.sleep(30) # Default 30 Seconds
return StreamingPullRequest(stream_ack_deadline_seconds=900)
subscriber = pubsub_v1.SubscriberAsyncClient() #skipping the class code but this is the Subscriber client
async def fetch_and_decode_msgs(self):
try:
print(f"Listening for messages on {self.subscription_path}...")
request = pubsub_v1.StreamingPullRequest(
subscription=self.subscription_path,
)
stream = await self.subscriber.streaming_pull(requests=AsyncStreamingRequestIterator(request), timeout=None)
print(f"stream: {stream}")
# Handle the response
async for response in stream:
for received_message in response.received_messages:
print("Received message: ", received_message.message.data.decode('utf-8'))
except Exception as e:
raise e
Has someone used this in peoduction scenarios? I am stuck and unable to pull messages using the async streaming client and any help would be benefecial.
Is wrapping the standard streaming pull in an asyncio executor gonna give me the same behaviour as below client?
python-pubsub/google/pubsub_v1/services/subscriber/async_client.py
Line 1368 in ff229a5
This is my current usage:
The text was updated successfully, but these errors were encountered: