内容来源
- 书籍:《C++ Concurrencyin Action》
- 章节:appendix C(附录C)
- 标题:A message-passingframework and complete ATM example(一个消息传递框架与完整的ATM示例)
- 中文翻译链接:https://github.com/xiaoweiChen/CPP-Concurrency-In-Action-2ed-2019/blob/master/content/appendix_C/C.0-chinese.md
消息的分发
receiver类用于接收消息,然后在wait中分发消息。处理流程:
- wait函数会返回dispatcher临时对象;
- dispatcher会在析构函数循环等待消息,处理close消息,或者把消息再次分发给TemplateDispatcher临时对象;
- TemplateDispatcher同样也在析构函数循环等待消息,处理通过模板参数指定类型的消息,或者把消息再次分发给TemplateDispatcher临时对象;
- 由于TemplateDispatcher可以通过指定模板参数生成任意的对象,通过链式调用分发消息,也就可以处理任意类型的消息;
dispatcher
- c++的临时对象会在语句执行完后销毁,dispatcher利用这点生成临时对象,并在析构中执行对应操作;
- wait_and_dispatch循环等待队列中的消息,通过在dispatch识别是否为close_queue消息,抛出异常从而结束掉进程(析构函数需要标记为noexcept(false));
- handle函数会生成一个TemplateDispatcher类型的临时对象,可以看到指定了3参数:
- q,即消息队列,由receiver保存,然后传递给dispatcher
- this,dispatcher本身的指针
- std::forward(f),一个回调函数,或者说钩子函数
namespace messaging
{
class close_queue // 用于关闭队列的消息
{};
class dispatcher
{
queue* q;
bool chained;
dispatcher(dispatcher const&)=delete; // dispatcher实例不能被拷贝
dispatcher& operator=(dispatcher const&)=delete;
template<
typename Dispatcher,
typename Msg,
typename Func> // 允许TemplateDispatcher实例访问内部成员
friend class TemplateDispatcher;
void wait_and_dispatch()
{
for(;;) // 1 循环,等待调度消息
{
auto msg=q->wait_and_pop();
dispatch(msg);
}
}
bool dispatch( // 2 dispatch()会检查close_queue消息,然后抛出
std::shared_ptr<message_base> const& msg)
{
if(dynamic_cast<wrapped_message<close_queue>*>(msg.get()))
{
throw close_queue();
}
return false;
}
public:
dispatcher(dispatcher&& other): // dispatcher实例可以移动
q(other.q),chained(other.chained)
{
other.chained=true; // 源不能等待消息
}
explicit dispatcher(queue* q_):
q(q_),chained(false)
{}
template<typename Message,typename Func>
TemplateDispatcher<dispatcher,Message,Func>
handle(Func&& f) // 3 使用TemplateDispatcher处理指定类型的消息
{
return TemplateDispatcher<dispatcher,Message,Func>(
q,this,std::forward<Func>(f));
}
~dispatcher() noexcept(false) // 4 析构函数可能会抛出异常
{
if(!chained)
{
wait_and_dispatch();
}
}
};
}
TemplateDispatcher
- 和dispatcher一样,TemplateDispatcher也是生成临时对象,在析构函数中执行功能;
- wait_and_dispatch也是循环等待队列中的消息,通过dispatch识别相应的消息类型然后执行回调函数;
- 如果dispatch识别的消息不是自己需要处理的类型,会把消息分发给上一级的临时对象,这个临时对象可能是dispatcher,也可能是TemplateDispatcher;
- handle()可以生成一个TemplateDispatcher对象,然后临时对象可以再次调用handle()生成一个TemplateDispatcher对象,如此往复就是一个调用链
dispatch.handle().handle().handle()
; - dispatcher与TemplateDispatcher关系:dispatcher的handle会生成一个TemplateDispatcher,然后TemplateDispatcher的handle再新生成一个TemplateDispatcher,往复无穷调用;
namespace messaging
{
template<typename PreviousDispatcher,typename Msg,typename Func>
class TemplateDispatcher
{
queue* q;
PreviousDispatcher* prev;
Func f;
bool chained;
TemplateDispatcher(TemplateDispatcher const&)=delete;
TemplateDispatcher& operator=(TemplateDispatcher const&)=delete;
template<typename Dispatcher,typename OtherMsg,typename OtherFunc>
friend class TemplateDispatcher; // 所有特化的TemplateDispatcher类型实例都是友元类
void wait_and_dispatch()
{
for(;;)
{
auto msg=q->wait_and_pop();
if(dispatch(msg)) // 1 如果消息处理过后,会跳出循环
break;
}
}
bool dispatch(std::shared_ptr<message_base> const& msg)
{
if(wrapped_message<Msg>* wrapper=
dynamic_cast<wrapped_message<Msg>*>(msg.get())) // 2 检查消息类型,并且调用函数
{
f(wrapper->contents);
return true;
}
else
{
return prev->dispatch(msg); // 3 链接到之前的调度器上
}
}
public:
TemplateDispatcher(TemplateDispatcher&& other):
q(other.q),prev(other.prev),f(std::move(other.f)),
chained(other.chained)
{
other.chained=true;
}
TemplateDispatcher(queue* q_,PreviousDispatcher* prev_,Func&& f_):
q(q_),prev(prev_),f(std::forward<Func>(f_)),chained(false)
{
prev_->chained=true;
}
template<typename OtherMsg,typename OtherFunc>
TemplateDispatcher<TemplateDispatcher,OtherMsg,OtherFunc>
handle(OtherFunc&& of) // 4 可以链接其他处理器
{
return TemplateDispatcher<
TemplateDispatcher,OtherMsg,OtherFunc>(
q,this,std::forward<OtherFunc>(of));
}
~TemplateDispatcher() noexcept(false) // 5 这个析构函数也是noexcept(false)的
{
if(!chained)
{
wait_and_dispatch();
}
}
};
}
使用dispatch和TemplateDispatcher
- 其中incoming是一个receiver对象,调用wait()生成了一个dispatch对象;
- dispatch对象调用handle生成一个TemplateDispatcher对象;
- TemplateDispatcher对象调用handle再次生成一个TemplateDispatcher对象;
- TemplateDispatcher对象可以handle,无穷止的生成TemplateDispatcher对象;
incoming.wait()
.handle<withdraw_ok>(
[&](withdraw_ok const& msg)
{
interface_hardware.send(
issue_money(withdrawal_amount));
bank.send(
withdrawal_processed(account,withdrawal_amount));
state=&atm::done_processing;
})
.handle<withdraw_denied>(
[&](withdraw_denied const& msg)
{
interface_hardware.send(display_insufficient_funds());
state=&atm::done_processing;
})
.handle<cancel_pressed>(
[&](cancel_pressed const& msg)
{
bank.send(
cancel_withdrawal(account,withdrawal_amount));
interface_hardware.send(
display_withdrawal_cancelled());
state=&atm::done_processing;
});