内容来源
- 书籍:《C++ Concurrencyin Action》
- 章节:appendix C(附录C)
- 标题:A message-passingframework and complete ATM example(一个消息传递框架与完整的ATM示例)
- 中文翻译链接:https://github.com/xiaoweiChen/CPP-Concurrency-In-Action-2ed-2019/blob/master/content/appendix_C/C.0-chinese.md
常规做法
一般在多线程间传递消息,我们会这样实现:
- 使用枚举或者宏定义消息id,然后使用固定结构体包装起来,结构体包含id和其他内容;
- 使用std中的容器保存结构体对象,发送端push数据到容器中,接收者pop数据后使用;
- 再加上锁和条件变量,实现生产者和消费者的异步通知机制;
这样的实现方式,使用id值判断不同的消息类型,从而处理不同的任务。
使用模板参数判断消息类型
先上代码,其中message_base和wrapped_message是消息定义,自定义的queue用来存放消息:
#include <mutex>
#include <condition_variable>
#include <queue>
#include <memory>
namespace messaging
{
struct message_base // 队列项的基础类
{
virtual ~message_base()
{}
};
template<typename Msg>
struct wrapped_message: // 每个消息类型都需要特化
message_base
{
Msg contents;
explicit wrapped_message(Msg const& contents_):
contents(contents_)
{}
};
class queue // 我们的队列
{
std::mutex m;
std::condition_variable c;
std::queue<std::shared_ptr<message_base> > q; // 实际存储指向message_base类指针的队列
public:
template<typename T>
void push(T const& msg)
{
std::lock_guard<std::mutex> lk(m);
q.push(std::make_shared<wrapped_message<T> >(msg)); // 包装已传递的信息,存储指针
c.notify_all();
}
std::shared_ptr<message_base> wait_and_pop()
{
std::unique_lock<std::mutex> lk(m);
c.wait(lk,[&]{return !q.empty();}); // 当队列为空时阻塞
auto res=q.front();
q.pop();
return res;
}
};
}
-
消息定义
- message_base是个空壳,主要用来作为wrapped_message的基类,实现多态,队列中的数据设置为基类(message_base)的类型,实际存放派生类(wrapped_message)的对象。
- wrapped_message除了继承message_base外,它还有一个模板参数,模板参数作用有两个:
- 提高灵活性:成员contents保存着消息内容,意味着消息可以是任意的数据类型,比如int、char、enum、struct等;
- 特化成不同的类型:
- 假如有3个struct,分别为A、B、C,作为模板参数特化wrapped_message类型,会生成3个不同的wrapped_message类型,并且他们都继承自message_base。
- 根据基类指针可以指向派生类对象的原则,只需要接口参数定义为基类指针,这3个不同的wrapped_message生成的对象,都可以传递给基类指针的接口;
- 当需要识别出基类执行具体指向的是哪个派生类(具体特化的wrapped_message类型),使用dynamic_cast进行转化即可;
-
消息队列
- 因为消息传递是在多线程间,所以使用lock保护,condition_variable用于异步信号通知和阻塞。
- 队列中存放是message_base指针,可以存放派生类的对象,派生类根据指定类型参数特化生成。
-
sender与receiver定义
sender负责发送消息,receiver用于处理消息,代码如下:
namespace messaging
{
class sender
{
queue*q; // sender是一个队列指针的包装类
public:
sender(): // sender无队列(默认构造函数)
q(nullptr)
{}
explicit sender(queue*q_): // 从指向队列的指针进行构造
q(q_)
{}
template<typename Message>
void send(Message const& msg)
{
if(q)
{
q->push(msg); // 将发送信息推送给队列
}
}
};
class receiver
{
queue q; // 接受者拥有对应队列
public:
operator sender() // 允许将类中队列隐式转化为一个sender队列
{
return sender(&q);
}
dispatcher wait() // 等待对队列进行调度
{
return dispatcher(&q);
}
};
}
- sender没有自己的队列,receiver拥有自己的队列;sender发送消息时使用的是receiver中给过来的队列指针,因为消息是给到receiver处理的。
- receiver中的函数
operator sender()
,是一个隐式类型转换的函数,把receiver类型对象转为sender类型对象。为什么这样做?
- 语法:c++在传参、赋值有类型要求,类型A必须只能赋值类型A和传参类型A,定义了operator+类型的函数,可以实现类型B赋值、传参给类型A,这个语法已经把类型B转为类型A了。
- 场景:从sender和receiver的定义可以看到,sender只需要一个队列指针,而这个指针可以是receiver中的队列,所以完全可以借助receiver中的数据构造一个sender对象,也即把receiver对象转为sender对象。
// 可以这样使用
receiver re;
sender se = re;
- receiver中的函数
wait()
,会阻塞等待消息进入队列,然后取出消息,执行相应的处理。
如何识别消息
目前定义了基类message_base,队列定义的是message_base类型指针,存放派生类对象,当从队列中取出message_base指针,如何识别到具体的派生类?
上面提到使用dynamic_cast,把message_base指针转化为特化后的message_base指针;根据dynamic_cast(专门用于有继承关系的类型间转换)转化规则,转化成功为指针的值,转化失败为null。
利用这个规则,每个类型消息都有自己的模板参数,指定模板参数后转换,能转换为期望的类型指针(结果非零),说明是需要处理的消息,执行对应的功能。
bool dispatch(std::shared_ptr<message_base> const& msg)
{
if(wrapped_message<Msg>* wrapper=
dynamic_cast<wrapped_message<Msg>*>(msg.get())) // 2 检查消息类型,并且调用函数
{
f(wrapper->contents);
return true;
}
else
{
return prev->dispatch(msg); // 3 链接到之前的调度器上
}
}
总结:使用模板参数+多态接口+dynamic_cast转换特性判断消息类型;因为模板参数是个变量,指定类型进行转换,借用转换结果值是否为null,筛选出不同的消息类型进行处理。