内容来源
- 书籍:《C++ Concurrencyin Action》
- 章节:appendix C(附录C)
- 标题:A message-passingframework and complete ATM example(一个消息传递框架与完整的ATM示例)
- 中文翻译链接:https://github.com/xiaoweiChen/CPP-Concurrency-In-Action-2ed-2019/blob/master/content/appendix_C/C.0-chinese.md
使用消息传递框架
有了消息传递框架,接着就是使用框架实现具体的业务逻辑。示例代码实现了一个简单银行业务流程:
- 定义消息类型
- 定义交互模块
- 在模块间传递消息,然后处理
ATM消息类型
- 结合前面的内容,有两个地方需要指定消息类型:
- 使用sender::send()发送消息时,需要指定消息类型
- 使用dispatcher::handle()和TemplateDispatcher::handle()接收处理消息时,也需要指定消息类型
- 消息类型的定义如下:
...
struct withdraw_ok
{};
struct withdraw_denied
{};
struct cancel_withdrawal
{
std::string account;
unsigned amount;
cancel_withdrawal(std::string const& account_,
unsigned amount_):
account(account_),amount(amount_)
{}
};
struct withdrawal_processed
{
std::string account;
unsigned amount;
withdrawal_processed(std::string const& account_,
unsigned amount_):
account(account_),amount(amount_)
{}
};
...
交互模块
示例代码定义了3个交互模块atm、bank_machine、interface_machine,分别代表ATM状态、银行状态、用户状态。
- atm模块
- 因为需要接收消息,有一个receiver类型的变量incoming。
- 同样需要发送消息,有两个sender类型的变量。
- bank:用于把消息发给bank_machine模块。
- interface_hardware:用于把消息发给interface_machine模块。
- get_sender(),用于把receiver类型变量转为sender类型的变量,其实是把receiver中保存的队列指针暴露给其他模块,其他模块可以发消息到atm模块。
- done(),给自身发一个退出的消息,一般用在程序需要结束的时候。
- run(),在一个单独的线程中执行,等待消息和处理。
- process_withdrawal()和process_balance()等其他功能函数,会等消息执行功能,然后修改state,在run()再次调用不同的功能函数,循环往复。
- bank_machine模块
- 定义一个receiver类型的变量incoming,接收atm模块的消息。
- 没有定义sender类型的变量,只在某些场景需要发消息给atm模块,可以通过收到atm模块的消息(withdraw)携带的消息队列指针把消息发给atm模块。
- interface_machine模块
- 和bank_machine模块类似,没有定义sender类型的变量,只有一个receiver类型的变量incoming,用于接收atm模块的消息。
模块间协作
- 创建了3个thread对象,分别对应3个不同的模块;
- atm会发消息给其他2个模块,所以atm模块获取了bank_machine和interface_machine的队列指针;
- 主线程接收用户输入后,通过atm模块发送消息到其他2个模块,最终驱动整个系统运行;
int main()
{
bank_machine bank;
interface_machine interface_hardware;
atm machine(bank.get_sender(),interface_hardware.get_sender());
std::thread bank_thread(&bank_machine::run,&bank);
std::thread if_thread(&interface_machine::run,&interface_hardware);
std::thread atm_thread(&atm::run,&machine);
messaging::sender atmqueue(machine.get_sender());
bool quit_pressed=false;
while(!quit_pressed)
{
char c=getchar();
switch(c)
{
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
atmqueue.send(digit_pressed(c));
break;
case 'b':
atmqueue.send(balance_pressed());
break;
case 'w':
atmqueue.send(withdraw_pressed(50));
break;
case 'c':
atmqueue.send(cancel_pressed());
break;
case 'q':
quit_pressed=true;
break;
case 'i':
atmqueue.send(card_inserted("acc1234"));
break;
}
}
bank.done();
machine.done();
interface_hardware.done();
atm_thread.join();
bank_thread.join();
if_thread.join();
}