-
Notifications
You must be signed in to change notification settings - Fork 206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PubSub: support batching publish requests with asyncio #20
Comments
For the record, this is my current implementation of an asyncio batch: https://github.com/mozilla/gcp-ingestion/blob/24c1cea/ingestion-edge/ingestion_edge/util.py#L7-L95 |
I haven't tried this myself, but there's a proposed solution to make publish future act like a concurrent Future: googleapis/google-cloud-python#6201 (comment) |
Python 2 support is deprecated, but it still needs to be preserved until the end of the year. Any support for asyncio will thus have to wait for at least another 6 months or so. |
do we have any update on this |
Now that we're past the date when python 2 support is officially dropped, can we have an update on this? Any timeline for the official asyncio support? I'm sure we're not alone in trying to use google cloud pubsub with asyncio-based libraries. |
We just released a new version of Pub/Sub that drops python 2.7 (and 3.5) support: https://github.com/googleapis/python-pubsub/releases/tag/v2.0.0 about a week ago. |
Is your feature request related to a problem? Please describe.
I have an
asyncio
application that needs to publish messages to PubSub, but I'm having issues becausegoogle.cloud.pubsub.PublisherClient.publish
:await
orasyncio.wrap_future
Batch._commit
throws an uncaught exception (like in PubSub: RetryError in batch publish causes futures to never complete google-cloud-python#7103 and PubSub: Propagate RetryError in PublisherClient.publish google-cloud-python#7071)Describe the solution you'd like
I wrote a new
google.cloud.pubsub_v1.publisher._batch.async.Batch
that implementsgoogle.cloud.pubsub_v1.publisher._batch.base.Batch
. It usesasyncio
to provide awaitable futures that automatically propagate exceptions. It uses a sharedconcurrent.futures.ThreadPoolExecutor
in conjunction withasyncio.wrap_future
to asynchronously callBatch.client.publish
while enforcing a maximum number of workers. I specifically only wrappedBatch.client.publish
in a thread because (if i understand correctly) it only blocks on exclusive access to the grpc channel, so it shouldn't create performance issues as seen in the first alternative below.I would like to submit this as a pull request, but only if it would be useful.
Describe alternatives you've considered
google.cloud.pubsub_v1.publisher._batch.thread.Batch
to useconcurrent.futures.ThreadPoolExecutor
. Unfortunately it had performance issues when all workers would reach atime.sleep
and there wouldn't be any workers to check that not yet submitted tasks could be ready.google.cloud.pubsub_v1.futures.Future
to inherit fromconcurrent.futures.Future
. This fixed compatiblity withasyncio.wrap_future
, but not uncaught exceptions and unlimited thread spawning.google.cloud.pubsub_v1.publisher._batch.thread.Batch
to join spawned threads, which would propagate uncaught exceptions, but I was unable to figure out a solution.The text was updated successfully, but these errors were encountered: