Marvin's Blog【程式人生】

Ability will never catch up with the demand for it

30 Jan 2020

C++/WinRT学习笔记(八):异步处理进阶

More advanced concurrency and asynchrony

Offloading work onto the Windows thread pool

co_await winrt::resume_background()可以把Coroutine挂起之后在线程池(Windows thread pool)上恢复执行。

Programming with thread affinity in mind

如果在线程池上执行的过程中需要切回原UI线程,那么需要先使用winrt::apartment_context来保存之前的线程信息:

IAsyncAction DoWorkAsync(TextBlock textblock)
{
    winrt::apartment_context ui_thread; // Capture calling context.

    co_await winrt::resume_background();
    // Do compute-bound work here.

    co_await ui_thread; // Switch back to calling context.

    textblock.Text(L"Done!"); // Ok if we really were called from the UI thread.
}

如果没有办法保存调用线程的上下文,那么可以使用winrt::resume_foreground以及伴对象的Dispatcher,来通过调用CoreDispatcher.RunAsync来返回之前的调用进程:

IAsyncAction DoWorkAsync(TextBlock textblock)
{
    co_await winrt::resume_background();
    // Do compute-bound work here.

    co_await winrt::resume_foreground(textblock.Dispatcher()); // Switch to the foreground thread associated with textblock.

    textblock.Text(L"Done!"); // Guaranteed to work.
}

Execution contexts, resuming, and switching in a coroutine

提交到线程池后,Coroutine的Completed方法可以被任意线程调用。如果在调用线程执行了co_await操作,那么必须将结果返回给调用线程。C++ WinRT会自动捕获co_await时的上下文,如果时STA套间,那么会在原线程返回co_await的结果;如果是在MTA套间,那么co_await返回时依然在一个MTA线程上。

C++/WinRT对常见的一些数值类型提供包装器,使其能够用于co_await,比如:

using namespace std::chrono_literals;
IAsyncOperation<int> return_123_after_5s()
{
    // No matter what the thread context is at this point...
    co_await 5s;
    // ...we're on the thread pool at this point.
    co_return 123;
}

UI线程上发起的异步操作会导致线程上下文切换,要注意避免过多的前后台线程切换。

对于在其他线程池上执行的异步线程,执行完后会在执行线程上调用Completed操作,然后按照C++/WinRT的安排返回。你可以采用自己设计的coroutine类型来避免返回,接着在该线程上执行操作:

IAsyncAction async{ ProcessFeedAsync() };
co_await static_cast<no_switch>(async);

no_switch的实现在文中可以找到。

A deeper dive into winrt::resume_foreground

在C++/WinRT 2.0版本中,winrt::resume_foreground 的行为有了一些小小的变化,就像是PostMessage和SendMessage的区别,主要表现在await_ready上:

        bool await_ready() const
        {
            return false; // Queue without waiting.
            // return m_dispatcher.HasThreadAccess(); // The C++/WinRT 1.0 implementation.
        }

此外,winrt::resume_foreground之前只支持CoreWindow的CoreDispatcher,现在还支持DispatcherQueue。好处是,你可以创建自己的DispatcherQueue :

using namespace Windows::System;

winrt::fire_and_forget RunAsync(DispatcherQueue queue);
 
int main()
{
    auto controller{ DispatcherQueueController::CreateOnDedicatedThread() };
    RunAsync(controller.DispatcherQueue());
    getchar();
}

上述例子把自行创建的DispatcherQueue传给coroutine RunAsync,这样后者可以resume_foreground到该DispatcherQueue。

另外一个例子是通过CreateDispatcherQueueController创建一个Win32的:

DispatcherQueueController CreateDispatcherQueueController()
{
    DispatcherQueueOptions options
    {
        sizeof(DispatcherQueueOptions),
        DQTYPE_THREAD_CURRENT,
        DQTAT_COM_STA
    };
 
    ABI::Windows::System::IDispatcherQueueController* ptr{};
    winrt::check_hresult(CreateDispatcherQueueController(options, &ptr));
    return { ptr, take_ownership_from_abi };
}

然后就可以:

winrt::fire_and_forget RunAsync(DispatcherQueue queue);
 
int main()
{
    Window window;
    auto controller{ CreateDispatcherQueueController() };
    RunAsync(controller.DispatcherQueue());
    MSG message;
 
    while (GetMessage(&message, nullptr, 0, 0))
    {
        DispatchMessage(&message);
    }
}

接着:

winrt::fire_and_forget RunAsync(DispatcherQueue queue)
{
    ... // Begin on the calling thread...
 
    co_await winrt::resume_foreground(queue); // 默认优先级
    // 或者指定优先级
    // co_await winrt::resume_foreground(queue, DispatcherQueuePriority::High);

    ... // ...resume on the dispatcher thread.
}

甚至可以:

winrt::fire_and_forget RunAsync(DispatcherQueue queue)
{
    ...
 
    co_await queue; // 默认优先级
 
    ...
}

如果队列关闭:

winrt::fire_and_forget RunAsync(DispatcherQueue queue)
{
    ...
 
    if (co_await queue)
    {
        ... // Resume on dispatcher thread.
    }
    else
    {
        ... // Still on calling thread.
    }
}

Canceling an asynchronous operation, and cancellation callbacks

异步操作是可以被Cancel的,文档中列集了一个中止StorageFolder::GetFilesAsync的例子。以及如何实现一个简单的可以被Cancel的Coroutine的例子。

运行中的Coroutine也可以被中止,但是只有等到执行co_await的时候,终止操作才会进行。Coroutine被挂起时也可以被中止,可是只有它恢复执行了才能真正终止。这导致中止请求和运行终止会存在延误。

一种办法是使用winrt::get_cancellation_token,主动在coroutine内轮询中止信息,而不通过co_await。例子:

IAsyncAction ExplicitCancellationAsync()
{
    auto cancellation_token{ co_await winrt::get_cancellation_token() };

    while (!cancellation_token())
    {
        std::cout << "ExplicitCancellationAsync: do some work for 1 second" << std::endl;
        co_await 1s;
    }
}

IAsyncAction MainCoroutineAsync()
{
    auto explicit_cancellation{ ExplicitCancellationAsync() };
    co_await 3s;
    explicit_cancellation.Cancel();
}
...

Windows Runtime的中止操作不会自动从一个异步对象传递在嵌套的异步对象。从Win10 1809 (10.0.17763.0)以后,Windows SDK引入了一个中止回调,这是一个抢占性的回调,用来中止Coroutine。例子:

IAsyncAction CancellationPropagatorAsync()
{
    auto cancellation_token{ co_await winrt::get_cancellation_token() };
    auto nested_coroutine{ NestedCoroutineAsync() };

    cancellation_token.callback([=]
    {
        nested_coroutine.Cancel();
    });

    co_await nested_coroutine;
}

因为是抢占性的,所有不需要轮询。

Reporting progress

对于返回类型是IAsyncActionWithProgress和IAsyncOperationWithProgress的Coroutine,可以通过winrt::get_progress_token获取一个token汇报进度:

IAsyncOperationWithProgress<double, double> CalcPiTo5DPs()
{
    auto progress{ co_await winrt::get_progress_token() };

    co_await 1s;
    double pi_so_far{ 3.1 };
    progress(0.2);

    ...

    co_return pi_so_far;
}

IAsyncAction DoMath()
{
    auto async_op_with_progress{ CalcPiTo5DPs() };
    async_op_with_progress.Progress([](auto const& /* sender */, double progress)
    {
        std::wcout << L"CalcPiTo5DPs() reports progress: " << progress << std::endl;
    });
    double pi{ co_await async_op_with_progress };
    std::wcout << L"CalcPiTo5DPs() is complete !" << std::endl;
    std::wcout << L"Pi is approx.: " << pi << std::endl;
}

对于异步事件的完成回调只能有一个,下面二选一:

auto async_op_with_progress{ CalcPiTo5DPs() };
async_op_with_progress.Completed([](auto const& sender, AsyncStatus /* status */)
{
    double pi{ sender.GetResults() };
});

或者

auto async_op_with_progress{ CalcPiTo5DPs() };
double pi{ co_await async_op_with_progress };

更多参考:Delegate types for asynchronous actions and operations

Fire and forget

如果只是想执行一个异步操作,而不用管它什么时候完成(没有其他任务依赖这个异步操作的结果),那么你可以触发这个操作之后就不管了,这个叫做fire and forget。通过winrt::fire_and_forget提供支持:

winrt::fire_and_forget CompleteInFiveSeconds()
{
    co_await 5s;
}

一个常见的用法是把事件回调作为fire_and_forget类型的coroutine。

Awaiting a kernel handle

C++/WinRT提供了一个resume_on_signal类型,可以在coroutine里面等待一个来自内核的事件:

IAsyncAction Async(winrt::handle event)
{
    co_await DoWorkAsync();
    co_await resume_on_signal(event); // The incoming handle *is* not valid here.
}

注意,上面的Coroutine传递的是类型是winrt::handle,而不是HANDLE。因为当Coroutine第一次返回时HANDLE可能会被调用者释放,而winrt::handle提供了所有者语义,可以在coroutine恢复和挂起的过程中保持参数有效。文档中介绍了一个具体的例子。

Asynchronous timeouts made easy

C++/WinRT的IAsyncAction的实现带有一个get()方法,用于阻塞等待异步操作结束并返回结果。另外一个wait_for()方法能够接受一个等待事件,从而避免无限挂起:

switch (async.wait_for(5s))
{
case AsyncStatus::Completed:
    printf("result %d\n", async.get());
    break;
case AsyncStatus::Canceled:
    puts("canceled");
    break;
case AsyncStatus::Error:
    puts("failed");
    break;
case AsyncStatus::Started:
    puts("still running");
    break;
}

如果wait_for()返回Completed,那么可以通过get()来获取结果。如果返回Error,说明coroutine内部出错,可以调用get()来重新抛出异常。如果返回Started,那么说明异步操作仍在进行,这时候不能再次执行wait_for,因为不支持。

Returning an array asynchronously

Windows.Foundation.IAsyncOperation<Int32[]> RetrieveArrayAsync();

上面的语句在MIDL 3.0中通不过,因为数组类型不能作为泛参。

一个变通的做法是把数组装箱成一个PropertyValue对象:

    Windows::Foundation::IAsyncOperation<Windows::Foundation::IInspectable> RetrieveCollectionAsync()
    {
        co_return Windows::Foundation::PropertyValue::CreateInt32Array({ 99, 101 }); // Box an array into a PropertyValue.
    }

auto boxed_array{ co_await m_sample_component.RetrieveCollectionAsync() };
auto property_value{ boxed_array.as<winrt::Windows::Foundation::IPropertyValue>() };
winrt::com_array<int32_t> my_array;
property_value.GetInt32Array(my_array); // Unbox back into an array.

Windows.System

DispatcherQueue

一个执行任务用的优先级队列,在一个线程执行。

方法:

  • CreateTimer() ,通过一个timer来提交任务
  • GetForCurrentThread() ,获取当前线程的队列
  • TryEnqueue(DispatcherQueueHandler),添加任务到队列
  • TryEnqueue(DispatcherQueuePriority, DispatcherQueueHandler),添加指定优先级的任务到队列

关停该队列的时候会触发两个事件:

  • ShutdownCompleted
  • ShutdownStarting

DispatcherQueueController Class

用来管理一个DispatcherQueue,拥有创建和关停队列的方法。

方法:

  • CreateOnDedicatedThread(),在专有线程上创建一个队列
  • ShutdownQueueAsync(),停止队列,如果队列是由CreateOnDedicatedThread()创建的,则关掉它。

(本篇完)

comments powered by Disqus