Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional setting of QtPromise and .then() context #20

Closed
immanuelweber opened this issue Jun 14, 2018 · 7 comments
Closed

Add optional setting of QtPromise and .then() context #20

immanuelweber opened this issue Jun 14, 2018 · 7 comments

Comments

@immanuelweber
Copy link

Hi,
I'm playing around with qtpromise and have situation where I'm launching a function using QtConcurrent:run() from an non-qt thread which is owned by an 3rdparty API. Using qtpromise in this situation appears not to be working, as the watcher in PromiseFulfill can not do its job without an eventloop and qtpromise_defer() would be using QThread::currentThread() which in my situation is the API thread also without an event loop.
I started to implement support for that, by adding a parameter to QtPromise which takes a pointer to a QThread which is passed down to PromiseFulfill::call(), where I move the watcher to that thread. In addition I added a parameter to .then() to also take a thread which is passed to the handlers and catchers which allow qtpromise_defer() to have a valid thread to post the event to.

Is that kind of functionality of general interest?
If so, what do you think would be better to use as a context, a thread or an object?

@pwuertz
Copy link
Contributor

pwuertz commented Jun 14, 2018

I'm not sure I'm getting a good picture of what you are trying to do. QtConcurrent::run() creates a QFuture which you are free to move to other threads for observing its state. To react asynchronously to its completion, you need to observe it from a thread with a Qt event loop. This is the thread in which you create the Promise from QFuture (which internally creates a QFutureWatcher on the same event loop).

@immanuelweber
Copy link
Author

let me sketch it out a bit more: the API uses a callback to provide IO data. The callback is executed in a thread owned by the API, which is not QThread based. From there I use a custom run() function to post an event (similar to what qtpromise does internally) to the object residing in the main thread, that event executes a function using a pointer. That function again uses something like QtPromise::qPromise(QtConcurrent::run(this, &Object::convert, data)).then([](int result) { // do something next } to further process the data asynchronously.
From my point of view it would be more natural to QtConcurrent::run() from the API thread directly and react on the result in the main thread, without explicitely moving the QFuture as you said. Something like: QtPromise::qPromise(QtConcurrent::run(this, &Object::convert)).then([]() { // do something next}, mainThread).
But as the QFutureWatcher of QPromise is created in the API thread it can not watch the future, that's why my current changes to qtpromise require me to write this:
QtPromise::qPromise(QtConcurrent::run(this, &Object::convert), mainThread).then([]() { // do seomthing next}, mainThread)

These are basically two things combined:

  • let QPromise watch the QFuture from a specific thread
  • let .then() be executed in a different/specific thread

I hope it is a bit clearer now

@pwuertz
Copy link
Contributor

pwuertz commented Jun 14, 2018

You want all the .then functions to hop arbitrarily between multiple threads, but I don't think this is the "right" way to think about the promise pattern. I think with promises the idea is to have only a single thread (main / event loop) running all the application logic and everything else is just an asynchronous resource (IO, worker threads, etc.).

So if I got your situation right, you have an external library with a registerCallback(Callback cb) function and some startAsyncProcess() function (possibly combined). After calling the start function, the library will call you back some later time using your registered function from a foreign thread.

The easiest way of modeling this is to create a promise, store the resolver in your callback function, call startAsyncProcess() and return the promise to the caller. The promise then resolves once the foreign thread calls the resolver, which QtPromise can handle just fine.

QPromise<Data> asyncApiCall() {
    return QPromise<Data>([](const auto& resolve, const auto&) {
        registerCallback([=](Data data) {
            // in foreign thread
            resolve(data);
        });
        startAsyncProcess();
    });
}

Now you simply initiate and await your process in main using asyncApiCall() and build on the results as usual.

@simonbrunel
Copy link
Owner

Thanks @pwuertz, I like your approach, which is close to what it's done in the unit test.

@egonuel does @pwuertz solution fix your issue? I'm not sure why you need a QFuture, maybe you could share your code to help us understand better. I'm quite reluctant to add a new "thread" parameter to .then() because we would also have to add it to all methods relying on .then(), such as fail(), finally(), tap(), tapFail(), map(), etc... which would complexify the current API.

@immanuelweber
Copy link
Author

thank you both of you. sorry I did not get back to you earlier, I could not work on it until today. My problem is that the callback is called repeatedly, each time new data is available. As far as I understand a promise can only be fullfilled once. I implement @pwuertz code and it appears that my understanding is right: it is only called once.
So I need to set the callback again after each call. A while ago I watched a C++Now 2017 talk of David Sankel about “Promises in C++: The Universal Glue for Asynchronous Programs". He presented a pattern like this:

dplp::Promise<> listenUntilError(Server s) {
	return s.listen().then([s](){ listenUntilError(s); });
}

Here s.listen() would be @pwuertz asyncApiCall() I implemented that pattern and it appears to work (I'm still not sure if there are no threading errors, as there was a crash in the first run after building...)
I will add a post right after this, where I show the relevant parts of my prior code (with changes to qtpromise) and my new code.

@immanuelweber
Copy link
Author

immanuelweber commented Jun 18, 2018

So my prior code looks like this:

  1. Observer class:
class FrameObserver : public IFrameObserver
{
public:
    FrameObserver(CameraPointer camera, FrameCallback callback) : IFrameObserver(camera), callback(callback) { assert(camera && callback);  }
    void FrameReceived(const FramePointer frame) final override {
        callback(frame, std::chrono::system_clock::now());
    }

private:
    FrameCallback callback;
};
  1. the creation of the observer with the lambda creation a promise from an concurrent run, note: the this->thread() parameters to qPromise and .then() are through the changes I did to qtpromise:
d->frameObserver = std::make_shared<FrameObserver>(d->camera, [=](FramePointer frame, auto readTime) {
    QtPromise::qPromise(QtConcurrent::run(&read, d->camera, frame, readTime, d->convertBayerToColor), this->thread()).then([=](Image image) {
        emit newData(image);
    }, this->thread());
});

Now the new code:

  1. Observer class, which now requires a mutes to guard the callback
class FrameObserver2 : public IFrameObserver
{
public:
    FrameObserver2(CameraPointer camera) : IFrameObserver(camera) { assert(camera);  }
    void FrameReceived(const FramePointer frame) final override
    {
        QMutexLocker lock(&mutex);
        if (callback) {
            callback(frame, std::chrono::system_clock::now());
        }
    }

    void setCallback(FrameCallback callback) {
        assert(callback);
        QMutexLocker lock(&mutex);
        this->callback = callback;
    }

private:
    QMutex mutex;
    FrameCallback callback;
};
  1. the continuous read function:
void continuousRead(CameraPointer camera, bool convertBayerToColor, std::shared_ptr<Observer> observer, DeviceNode * node)
{
    QtPromise::QPromise<Image>([=](const auto & resolve, const auto &) {
        observer->setCallback([=](auto frame, auto readTime) {
            QtConcurrent::run([=](auto frame, auto readTime) {
                resolve(read(camera, frame, readTime, convertBayerToColor));
            }, frame, readTime);
        });
    }).then([=](Image image) {
        emit node->newData(image);
        continuousRead(camera, convertBayerToColor, observer, node);
    });
}
  1. observer construction and start of reading:
d->frameObserver = std::make_shared<FrameObserver2>(d->camera);
continuousRead(d->camera, d->convertBayerToColor, d->frameObserver, this);

so it's a lot more code, still I somehow like the approach. But I don't like that this requires to continuously set the callback and interact with the api thread with mutex etc. I'm playing with this and try to get more insights and maybe get it shorter and more concise. Maybe my design is also somehow flawed. I'm open for comments and ideas...
Btw. I know emitting a signal I could have also done in a lamba using QtConcurrent::run(), but I'm more interested in the general concept

@simonbrunel
Copy link
Owner

Closing this issue in favor of #35

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants