Нужно реализовать вот такую незамысловатую штуку. Обычная thread safe очередь но с возможность удаления поставленных в нее объектов.
При этом хотелось бы что бы «опцию» удаления объектов из очереди можно было бы отключить безболезненно если она не нужна.
Реализация сводится к тому что бы в очереди хранить не сам объкт, а некий подставной хендл. Соотвественно при добавлении в очередь очередного объекта возвращать на него хендел через который его можно удалить.
Если пораскинуть мозгами, то появляются вот такие требования к реализации:
- Сторона которая делает push получает хендел через который можно удалить объект
- Если кто то сделал pop этого объекта, то удалить его через хендл полученных из push уже нельзя
- Если объект был удален, то pop возвращает пустой хендл, поэтому перед использованием объекта полученного из pop нужно его проверить (хотя эту проверку можно делать сразу в pop)
У меня получилась вот такая реализация:
template<class T, class H = ::Holder<T> >
class Queue
{
typedef typename H::Ptr Ptr;
public:
typedef H Holder;
object():stop_(false){}
Holder push(T* t)
{
std::unique_lock<std::mutex> lock(mtx_);
Holder holder(t);
q_.push_back(holder.ptr());
cond_.notify_one();
return holder;
}
Holder pop()
{
std::unique_lock<std::mutex> lock(mtx_);
while(q_.empty() && !stop_) cond_.wait(lock);
if(stop_ && q_.empty())
throw std::runtime_error("stopped");
Ptr ptr = q_.front();
q_.pop_front();
return Holder(ptr);
}
void stop()
{
std::unique_lock<std::mutex> lock(mtx_);
stop_ = true;
cond_.notify_one();
}
private:
std::deque<Ptr> q_;
bool stop_;
std::mutex mtx_;
std::condition_variable cond_;
};
Первый тип хендлера объекта, просто замещает обычный указатель. Нужен из за поддержки единиго кода в самой очереди
template<class T>
class Holder
{
public:
typedef T* Ptr;
public:
Holder(T* t): t_(t){}
void reset() { }
operator bool(){ return static_cast<bool>(t_); }
T* operator->() {return t_;}
Ptr ptr() { return t_; }
private:
T* t_;
};
Второй тип хендлера, подразумевает что настоящим владельцем (хранителем объекта) будет сам pusher
template<class T>
class RemovableHolder
{
public:
typedef std::weak_ptr<T> Ptr;
public:
RemovableHolder(T* t):t_(t){}
RemovableHolder(const Ptr& ptr):t_(ptr.lock()){}
void reset() { t_.reset(); }
operator bool(){ return static_cast<bool>(t_); }
T* operator->() {return t_.get();}
Ptr ptr() { return std::weak_ptr<T>(t_); }
private:
std::shared_ptr<T> t_;
};
Третий холдер, решает эту проблему
template<class T>
class ExRemovableHolder
{
struct Impl
{
public:
Impl(T* t):t_(t), is_popped_(false){}
void destroy(bool from_popper)
{
std::unique_lock<std::mutex> lock(mtx_);
if(is_popped_ && !from_popper)
{
return;
}
if(t_)
{
delete t_;
t_ = 0;
}
}
~Impl()
{
delete t_;
}
public:
std::mutex mtx_;
T* t_;
bool is_popped_;
};
public:
typedef std::shared_ptr<Impl> Ptr;
public:
ExRemovableHolder(T* t):impl_(new Impl(t)), popper_(false){}
ExRemovableHolder(Ptr& ptr):popper_(true)
{
std::unique_lock<std::mutex> lock(ptr->mtx_);
if(ptr->t_)
{
impl_ = ptr;
impl_->is_popped_ = true;
}
}
void reset() { impl_->destroy(popper_); }
operator bool(){ return static_cast<bool>(impl_); }
T* operator->() {return impl_->t_;}
Ptr ptr() { return impl_; }
private:
std::shared_ptr<Impl> impl_;
bool popper_;
};
Вот пример как это используется
typedef Queue<Person, ExRemovableHolder<Person> > TSQueue;
typedef simple_object<TSQueue::Holder> DestroyQueue;
void popper(TSQueue& q)
{
try
{
while(true)
{
TSQueue::Holder tmp = q.pop();
if(tmp)
{
tmp->name();
tmp.reset();
}
}
}
catch(...)
{
std::cout << "popper finished\n";
}
}
void destroyer(DestroyQueue& q)
{
try
{
while(true)
{
TSQueue::Holder tmp = q.pop();
tmp.reset();
}
}
catch(...)
{
std::cout << "destroyer finished\n";
}
}
int main()
{
TSQueue q;
std::thread th(popper, std::ref(q));
DestroyQueue d;
std::thread thd(destroyer, std::ref(d));
const int repeat = 9999999;
for(int i=1; i<=repeat; i++)
{
TSQueue::Holder h = q.push(new Person("Vasya"));
d.push(h);
}
q.stop();
d.stop();
th.join();
thd.join();
}
Зачем вся эта котовасия понадобилась. Это вроде как ActiveObject где активные объекты должны исполнятся в пуле тредов. Т.е. очередь это интерфейс для асинхронного выполнения задач. Притом логично, что после постановки задачи в очередь, может произойти так, что эта задача станет уже не актуальной до момента, когда до нее дойдет очередь выполнения (например клиент отключился), и тогда ее выполнения будет бессмысленным.
Если система слобосвязана, то это по сути единственнй способ отменить выполнения таски.
А теперь я хотел бы выслушать вашу критику. И в плане идеи и в плане реализации.