Qt 6中的异步API

2020年9月16日,星期三,Sona Kurazyan | 评论

大家可能知道Qt提供了几种多线程结构(线程,互斥体,等待条件等),以及更高级别的API,如QThreadPoolQt Concurrent和其他相关类。在本文中,我们将专注于更高级别的异步API和Qt 6中引入的更改。

Qt中更高级别的并发API

Qt Concurrent通过消除对低级同步(基元,例如互斥锁和锁)的需求,并手动管理多个线程,使多线程编程变得更加容易。它为并行处理可迭代容器提供了映射,过滤和归约算法(从功能编程中可以更好地了解)。此外,还有类QFuture,QFutureWatcher和,QFutureSynchronizer用于访问和监视异步计算的结果。尽管所有这些都非常有用,但是仍然存在一些缺点,例如无法使用QFuture 在Qt Concurrent之外,缺乏对链接多个计算以简化和简洁代码的支持,缺乏Qt Concurrent API的灵活性等。对于Qt 6,目前正在尝试解决这些问题,并使Qt的多线程编程更加有趣 !

将延续附加到QFuture

多线程编程中的一种常见情况是运行异步计算,这又需要调用另一个异步计算并将数据传递给该异步计算,该异步计算依赖于另一个计算,依此类推。由于每个阶段都需要上一个阶段的结果,因此您需要等待(通过阻止或轮询)直到上一个阶段完成并使用其结果,或者以“回调”的方式构造代码。这些选项都不是完美的:要么浪费资源等待时间,要么获取复杂的无法维护的代码。添加新的阶段或逻辑(用于错误处理等)会进一步增加复杂性。

为了更好地理解问题,让我们考虑以下示例。假设我们要从网络下载大图像,对其进行一些繁重的处理,然后在我们的应用程序中显示生成的图像。因此,我们执行以下步骤:

  • 发出网络请求并等待,直到收到所有数据
  • 根据原始数据创建图像
  • 处理图像
  • 展示下

对于每个需要依次调用的步骤,我们都有以下方法:

QByteArray download(const QUrl &url);
QImage createImage(const QByteArray &data);
QImage processImage(const QImage &image);
void show(const QImage &image);

我们可以使用QtConcurrent异步运行这些任务并QFutureWatcher监视进度:

void loadImage(const QUrl &url) {
    QFuture data = QtConcurrent::run(download, url);
    QFutureWatcher dataWatcher;
    dataWatcher.setFuture(data);
    
    connect(&dataWatcher, &QFutureWatcher ::finished, this, [=] {
        // handle possible errors
        // ...
        QImage image = createImage(data);
        // Process the image
        // ...
        QFuture processedImage = QtConcurrent::run(processImage, image);
        QFutureWatcher imageWatcher;
        imageWatcher.setFuture(processedImage);

        connect(&imageWatcher, &QFutureWatcher::finished, this, [=] {
            // handle possible errors
            // ...
            show(processedImage);
        });
    });
}

我们要添加到链中的步骤越多越难看。QFuture通过添加对通过QFuture::then()方法附加延续的支持,可以帮助解决此问题:

auto future = QtConcurrent::run(download, url)
            .then(createImage)
            .then(processImage)
            .then(show);

这无疑看起来要好得多!但是缺少一件事:错误处理。您可以执行以下操作:

auto future = QtConcurrent::run(download, url)
            .then([](QByteArray data) {
                // handle possible errors from the previous step
                // ...
                return createImage(data);
            })    
            .then(...)    
            ...

这将起作用,但是错误处理代码仍与程序逻辑混合在一起。另外,如果其中一个步骤失败,我们可能也不想运行整个链。这可以通过使用QFuture::onFailed()方法来解决,该方法允许我们为每种可能的错误类型附加特定的错误处理程序:

auto future = QtConcurrent::run(download, url)
            .then(createImage)
            .then(processImage)
            .then(show)
            .onFailed([](QNetworkReply::NetworkError) {
                // handle network errors
            })
            .onFailed([](ImageProcessingError) {
                // handle image processing errors
            })
            .onFailed([] {
                // handle any other error
            });

请注意,使用.onFailed()需要启用异常类。如果任何步骤失败并发生异常,则链会中断,并调用与抛出的异常类型匹配的错误处理程序。

根据信号创建QFuture

给定一个带有signal 的QObject基于类,您可以通过以下方式将此用作Future类:MyObjectvoid mySignal(int)

QFuture intFuture = QtFuture::connect(&object, &MyObject::mySignal);

现在,您可以将延续,失败或取消处理程序附加到最终的结果上。

请注意,最终结果的类型与signal的自变量类型匹配。如果没有参数,则 返回 QFuture。如果有多个参数,则结果存储在中std::tuple。

让我们回到图像处理示例的第一步(即下载),以了解这在实践中如何有用。有很多方法可以实现它,我们将使用QNetworkAccessManager来发送网络请求并获取数据:

QNetworkAccessManager manager;    
...

QByteArray download(const QUrl &url) {        
    QNetworkReply *reply = manager.get(QNetworkRequest(url));
    QObject::connect(reply, &QNetworkReply::finished, [reply] {...});
    
    // wait until we've received all data
    // ...    
    return data;        
}

但是上面的阻塞等待不是很好,如果我们可以避开它那就更好了,比如说“当QNetworkAccessManager获取数据时,创建一个图像,然后对其进行处理然后显示”。我们可以通过将网络访问管理器的finished()信号连接到QFuture:

QNetworkReply *reply = manager.get(QNetworkRequest(url));

auto future = QtFuture::connect(reply, &QNetworkReply::finished)
        .then([reply] {
            return reply->readAll();
        })
        .then(QtFuture::Launch::Async, createImage)
        .then(processImage)
        .then(show)        
        ...

您会注意到,现在我们不再使用QtConcurrent::run()异步下载而是在新线程中返回数据,我们只是连接到QNetworkAccessManager::finished()信号,从而开始了计算链。还请注意以下行中的其他参数:

        .then(QtFuture::Launch::Async, createImage)

默认情况下.then()在父进程运行所在的同一线程(在本例中为主线程)中调用by附加的延续。现在,我们不再使用QtConcurrent::run()异步启动链,我们需要传递附加QtFuture::Launch::Async参数,以在单独的线程中启动连续链,并避免阻塞UI。

创建一个QFuture

到目前为止,在QFuture内部创建和存储值的唯一“官方”方法是QtConcurrent中的一种方法。所以QtConcurrent以外,QFuture不是很有用。在Qt 6中,将Andrei Golubev引入了“Setter”, QFuture: QPromise的对应物。它可用于为异步计算设置值,进度和异常,以后可通过访问QFuture。为了演示其工作原理,让我们再次重写图像处理示例,并使用QPromise该类:

QFuture download(const QUrl &url) {
    QPromise promise;
    QFuture future = promise.future();
    
    promise.reportStarted(); // notify that download is started
    
    QNetworkReply *reply = manager.get(QNetworkRequest(url));
    QObject::connect(reply, &QNetworkReply::finished,
            [reply, p = std::move(promise)] {
                p.addResult(reply->readAll());
                p.reportFinished(); // notify that download is finished
                reply->deleteLater();
            });
    
    return future;
}

auto future = download()
        .then(QtFuture::Launch::Async, createImage)
        .then(processImage)
        .then(show)
        ...

QtConcurrent的变化

-现在,您可以为QtConcurrent的所有方法设置自定义线程池,而不是始终在全局线程池上运行它们并可能阻止其他任务的执行。
-映射和过滤器缩小算法现在可以采用初始值,因此您不必为没有默认构造函数的类型做变通办法。
- QtConcurrent::run进行了改进,可以处理可变数量的参数和仅移动类型。

此外,我们在QtConcurrent中添加了两个新的API,以为用户提供更大的灵活性。让我们更详细地看一下。

QtConcurrent :: runWithPromise

QtConcurrent::runWithPromise()Jarek Kobus开发的新方法是QtConcurrent框架的另一个不错的补充。它非常类似于QtConcurrent::run(),不同之处在于,它使QPromise与给定任务相关联的对象可供用户访问。

auto future = QtConcurrent::runWithPromise(
            [] (QPromise &promise, /* other arguments may follow */ ...) {
                // ...
                for (auto value : listOfValues) {
                    if (promise.isCanceled())
                        // handle the cancellation
        
                // do some processing...
        
                promise.addResult(...);
                promise.setProgressValue(...);
                }
            },
            /* pass other arguments */ ...);

runWithPromise()用户可以更好地控制任务,并且可以响应取消或暂停请求,进行进度报告等操作,而这些使用QtConcurrent::run()是不可能实现的。

QtConcurrent ::任务

QtConcurrent::task()提供了一个流畅的界面,用于在单独的线程中运行任务。它对于QtConcurrent::run()是更为现代的替代方案,并配置任务的方式也更为方便。您可以使用任何顺序指定参数,跳过不需要的参数,等等,而不是使用少数几个参数之一来传递参数来运行任务。例如:

QFuture future = QtConcurrent::task(doSomething)
        .withArguments(1, 2, 3)
        .onThreadPool(pool)
        .withPriority(10)
        .spawn();

请注意,与run()不同,您还可以为任务传递优先级。

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐