两个引用计数:内部计数和外部计数
-
除了使用风险指针,管理多线程中内存释放问题,还可以使用引用计数。
-
本例中使用两个引用计数管理内存,一个计数保存引用该内存的次数【自增】,另一个计数保存引用完该内存后需要释放的次数【自减】,两个计数正负相互抵消,代表内存可以安全释放。
-
部分代码一分析:
- (1)结构counted_node_ptr保存外部引用计数和一个指向node的指针,也就是说counted_node_ptr和它指向的node是一个整体,head记录第一个node信息;
- (2)结构node虽然有一个成员counted_node_ptr,但是它却记录下一个node的信号,也就是说node内部的counted_node_ptr和counted_node_ptr指向的node是一个整体;
- (3)函数push使用new创建一个新的node,放在stack的头部,所以head需要记录新创建的node信号,新node内部的counted_node_ptr需要记录旧的head记录的信息;
- (1)new_node是一个临时变量,用于记录新创建的node信息;当前head还记录着其他node的信息,等到把当前head记录的信息转交给新node记录后,就可以把临时变量new_node记录的信息转交给head记录;
- (2)compare_exchange_weak功能有两个:①确保旧head记录的node信息成功的转交给新node中next成员记录,监测到没转交失败,会再次执行,即把head转交给new_node.ptr->next;②把临时变量new_node记录的信息转交给head记录,使用while多次执行,确保一定可以转交成功;
template<typename T> class lock_free_stack { private: struct node; struct counted_node_ptr // 1 { int external_count; node* ptr; }; struct node { std::shared_ptr<T> data; std::atomic<int> internal_count; // 2 counted_node_ptr next; // 3 node(T const& data_): data(std::make_shared<T>(data_)), internal_count(0) {} }; std::atomic<counted_node_ptr> head; // 4 public: ~lock_free_stack() { while(pop()); } void push(T const& data) // 5 { counted_node_ptr new_node; new_node.ptr=new node(data); new_node.external_count=1; new_node.ptr->next=head.load(); while(!head.compare_exchange_weak(new_node.ptr->next,new_node)); } };
-
部分代码二分析:
- (1)函数increase_head_count功能,自增函数参数old_counter的成员external_count,同时把这一变化同步给head,因为函数参数old_counter获取的就是当前head的值。
- (1)使用compare_exchange_strong修改head的值,可能会失败,需要多次尝试,所以使用了while语句;
- (2)修改head的值失败的时候,old_counter会更新为当前的haed,所以需要对更新后old_counter再次执行自增操作;
- (2)函数pop功能,把head保存的node从stack上删除,并且保证这个节点的内存可以安全的释放掉。
- (1)外部引用计数external_count值,记录该节点在多线程中被引用的次数;
- 使用push加入stack中时候,初始化值为1,代表stack中在引用这个节点;
- pop可以被多个线程调用,当每个线程解引用node【语句2】时候,都需要使用increase_head_count自增该节点的外部引用计数external_count的值;
- (2)内部引用计数internal_count值,记录该节点在多线程中被引用结束后的次数;
- 多线程中,每次引用节点结束后,都需要判断当前节点的内存是否可以释放,同时修改内部引用计数internal_count的值;
- (3)pop中有3个if语句。
- (1)第一个if场景,代表stack中没有存放node,直接返回空值std::shared_ptr();
- (2)第二个if场景,【语句3】使用compare_exchange_strong修改head值,即把第一个node从stack上删除,会有多个线程执行该功能;
- 成功把node从stack中删除,说明可以把node保存的数据返回了【语句4、7】;但是需要处理node内存释放的问题;
- 删除的node中外部计数external_count指减去2【语句5】,是因为node从stack中删除需要减1,对node的引用结束需要再次减1,所以需要减去2;
- 【语句6】判断内部计数和外部计数的关系,内部计数internal_count在引用结束会减1,外部计数external_count会在开始引用的时候加1,只要是成对出现说明node可以安全删除了;
- 出现的场景:假如stack有一个node,有6个线程执行pop函数;其中一个线程A成功执行了【语句3】,正在执行【语句5】;其他5个线程,执行【语句3】失败且都刚好执行完【语句9】;那么线程A执行【语句6】时,内部计数internal_count值为-5,外部计数减2后count_increase的值为5,此时就可以安全释放掉node内存;
- 【语句6】把内部计数internal_count的值,加上外部计数external_count减2后count_increase的值,实现内、外计数相互抵消;
- 计算后内部计数internal_count的值肯定为正值,因为有其他线程还在引用node,结合开始引用时,外部计数external_count加1,引用结束,内部计数internal_coun减1的约定,external_count绝对值大于internal_coun的绝对值;
- 删除节点后,internal_count为正值,其他线程引用完删除的node后会对内部节点internal_count执行减1操作,当检测到internal_count为1时,说明最后一线程引用node结束,可以安全释放node内存【语句9】;
- (3)第三个if场景,是在compare_exchange_strong执行失败的时候,自减内部引用计数internal_count的值(引用node已经结束了);同时判断是否可以释放node的内存;
- (1)外部引用计数external_count值,记录该节点在多线程中被引用的次数;
- (3)从stack上删除node时候,会判断当前这个node安全释放内存,如果现在不能释放,需要其他线程结束node引用完成的时候判断node是否可以安全释放内存;
template<typename T> class lock_free_stack { private: void increase_head_count(counted_node_ptr& old_counter) { counted_node_ptr new_counter; do { new_counter=old_counter; ++new_counter.external_count; } while(!head.compare_exchange_strong(old_counter,new_counter)); // 1 old_counter.external_count=new_counter.external_count; } public: std::shared_ptr<T> pop() { counted_node_ptr old_head=head.load(); for(;;) { increase_head_count(old_head); node* const ptr=old_head.ptr; // 2 if(!ptr) { return std::shared_ptr<T>(); } if(head.compare_exchange_strong(old_head,ptr->next)) // 3 { std::shared_ptr<T> res; res.swap(ptr->data); // 4 int const count_increase=old_head.external_count-2; // 5 if(ptr->internal_count.fetch_add(count_increase)== // 6 -count_increase) { delete ptr; } return res; // 7 } else if(ptr->internal_count.fetch_sub(1)==1) // 9 { delete ptr; // 8 } } } };
- (1)函数increase_head_count功能,自增函数参数old_counter的成员external_count,同时把这一变化同步给head,因为函数参数old_counter获取的就是当前head的值。
提高性能:使用宽松类型的内存序
- 目前为止,代码中涉及到的原子操作,都是默认的内存序【memory_order_seq_cst】;
- 现在准备改进,假设先全部修改效率最高的自由序【memory_order_relaxed】,即语句①~⑦处,全部使用自由序【memory_order_relaxed】;
- 由于自由序没有先后顺序,lock_free_stack的实现中需要有2种严格先后顺序:
- (1)push用于添加数据到stack,会修改head的值,pop会获取head的值,push和pop需要store/load变量head,形成一对release/acquire操作;
- (2)pop函数中,把node从stack成功删除后,需要获取node中保存的data数据后【res.swap(ptr->data)】,执行释放node的操作【delete ptr】,此处也形成一对release/acquire操作;
- 代码分析:
- (1)语句①,是获取head值,不需要和其他操作同步,保持自由序即可;
- (2)语句②,是store变量head的值;执行成功,需要和pop中的load操作同步(store前语句先于load后语句执行),使用【memory_order_release】;执行失败,没有内存限制要求,使用自由序即可;
- (3)语句③,是load变量head的值;执行成功,需要和push中的store操作同步(load后语句晚于store前语句执行),使用【memory_order_acquire】;执行失败,没有内存限制要求,使用自由序即可;
- (4)语句④,保持自由序即可;虽然它也是load变量head的值,但是在它后面有语句③,可以通过语句③保证load和store同步,所以此处没有内存限制;
- (5)语句⑤,无需和其他操作同步,没有内存限制,保持自由序即可;
- (6)语句⑥和语句⑦,都是修改node内部计数的值【ptr->internal_count】,语句⑥之前会获取node中data数据,语句⑦之后的会释放掉node的内存,所以语句⑥可以和语句⑦同步,确保先获取node中data数据后再执行释放掉node的内存的操作;即语句⑥使用【memory_order_release】,语句⑦使用【memory_order_acquire】;
- (7)更进一步,虽然释放掉node的内存的操作在语句⑦后面,但是执行语句⑦不一定会执行释放node内存的操作,换句话说,执行语句⑦的频率比执行释放node内存的频率高;我们的目的是为了保证释放node内存操作在语句⑥之后执行,为减少【memory_order_acquire】的影响范围,在if内引入语句⑧使用【memory_order_acquire】,而语句⑦仍使用【memory_order_relaxed】;这样,执行频率高的语句⑦没有多余的内存限制,同时也保证了释放node和与语句⑥同步,进一步提高了性能;
template<typename T> class lock_free_stack { private: struct node; struct counted_node_ptr { int external_count; node* ptr; }; struct node { std::shared_ptr<T> data; std::atomic<int> internal_count; counted_node_ptr next; node(T const& data_): data(std::make_shared<T>(data_)), internal_count(0) {} }; std::atomic<counted_node_ptr> head; void increase_head_count(counted_node_ptr& old_counter) { counted_node_ptr new_counter; do { new_counter=old_counter; ++new_counter.external_count; } while(!head.compare_exchange_strong(old_counter,new_counter, std::memory_order_acquire, std::memory_order_relaxed)); // 3 old_counter.external_count=new_counter.external_count; } public: ~lock_free_stack() { while(pop()); } void push(T const& data) { counted_node_ptr new_node; new_node.ptr=new node(data); new_node.external_count=1; new_node.ptr->next=head.load(std::memory_order_relaxed) // 1 while(!head.compare_exchange_weak(new_node.ptr->next,new_node, std::memory_order_release, std::memory_order_relaxed)); // 2 } std::shared_ptr<T> pop() { counted_node_ptr old_head= head.load(std::memory_order_relaxed); // 4 for(;;) { increase_head_count(old_head); node* const ptr=old_head.ptr; if(!ptr) { return std::shared_ptr<T>(); } if(head.compare_exchange_strong(old_head,ptr->next, std::memory_order_relaxed)) // 5 { std::shared_ptr<T> res; res.swap(ptr->data); int const count_increase=old_head.external_count-2; if(ptr->internal_count.fetch_add(count_increase, std::memory_order_release)==-count_increase) // 6 { delete ptr; } return res; } else if(ptr->internal_count.fetch_add(-1, std::memory_order_relaxed)==1) // 7 { ptr->internal_count.load(std::memory_order_acquire); // 8(改进后新增加的语句) delete ptr; } } } };