Skip to content

Commit

Permalink
Merge pull request #342 from ChrisCatCP/master
Browse files Browse the repository at this point in the history
refactor wait_group
  • Loading branch information
zhengshuxin committed Jun 26, 2024
2 parents b7ab0c8 + 9ae2351 commit fc6a460
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 25 deletions.
7 changes: 4 additions & 3 deletions lib_fiber/cpp/include/fiber/wait_group.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include "acl_cpp/stdlib/atomic.hpp"

namespace acl {

Expand All @@ -9,12 +10,12 @@ class wait_group {
wait_group(void);
~wait_group(void);

void add(size_t n);
void add(int n);
void done(void);
size_t wait(void);
void wait(void);

private:
size_t count_;
atomic_long state_;
fiber_tbox<unsigned long>* box_;
};

Expand Down
75 changes: 54 additions & 21 deletions lib_fiber/cpp/src/wait_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace acl {

wait_group::wait_group(void)
{
count_ = 0;
state_ = 0;
box_ = new acl::fiber_tbox<unsigned long>;
}

Expand All @@ -15,37 +15,70 @@ wait_group::~wait_group(void)
delete box_;
}

void wait_group::add(size_t n)
void wait_group::add(int n)
{
count_ += n;
long long state = state_.add_fetch((long long)n << 32);
//高32位为任务数量
int c = (int)(state >> 32);
//低32位为等待者数量
uint32_t w = (uint32_t)state;
//count不能小于0
if(c < 0){
acl_msg_fatal("wait_group: negative wait_group counter");
}
if(w != 0 && n > 0 && c == n){
acl_msg_fatal("wait_group: add called concurrently with wait");
}
if(c > 0 || w ==0){
return;
}
//检查state是否被修改
if(state_ != state){
acl_msg_fatal("wait_group: add called concurrently with wait");
}
//这里count为0了,清空state并唤醒所有等待者
state_ = 0;
for (size_t i = 0; i < w; i++) {
#ifdef _DEBUG
unsigned long* tid = new unsigned long;
*tid = acl::thread::self();
box_->push(tid);
#else
box_->push(NULL);
#endif
}
}

void wait_group::done(void)
{
#ifdef _DEBUG
unsigned long* tid = new unsigned long;
*tid = acl::thread::self();
box_->push(tid);
#else
box_->push(NULL);
#endif
add(-1);
}

size_t wait_group::wait(void)
void wait_group::wait(void)
{
size_t i;
for (i = 0; i < count_; i++) {
bool found;
for(;;){
long long state = state_;
int c = (int)(state >> 32);
uint32_t w = (uint32_t)state;
//没有任务直接返回
if(c == 0) return;
//等待者数量加一,失败的话重新获取state
if(state_.cas(state, state + 1) == state){
bool found;
#ifdef _DEBUG
unsigned long* tid = box_->pop(-1, &found);
assert(found);
delete tid;
unsigned long* tid = box_->pop(-1, &found);
assert(found);
delete tid;
#else
(void) box_->pop(-1, &found);
assert(found);
(void) box_->pop(-1, &found);
assert(found);
#endif
}
return i;
if(state_ != 0){
acl_msg_fatal("wait_group: wait_group is reused before previous wait has returned");
}
return;
}
}
}

} // namespace acl
2 changes: 1 addition & 1 deletion lib_fiber/samples/wait_group/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class fiber_waiter : public acl::fiber
// @override
void run(void) {
size_t ret = sync_.wait();
printf("All threads and fibers were done, ret=%zd\r\n", ret);
printf("All threads and fibers were done\r\n");
}
};

Expand Down

0 comments on commit fc6a460

Please sign in to comment.