История изменений
Исправление slackwarrior, (текущая версия) :
LazyQueue::LazyQueue(const Utils::Properties& props)
: queue_(props.getInteger("Queue.Size"))
, worker_(queue_,
[ this, &props ]() -> ThreadPoolT::Config
{
auto stopThread = [] (boost::thread& t) {
t.interrupt();
t.join();
};
auto fillEMessage = [ this ] (sql::PreparedStatement& stmt, const Parser::EMessage& er) {
stmt.setUInt (1, er.getExecID()); //in execId bigint(13),
...
stmt.setUInt (11, t.getSellerIsPrinted()); //in sIsPrint char(1)
};
auto doTask = [ this, &props, &fillEMessage ](){
boost::this_thread::interruption_point();
LogCategory* log = Log::instance()->createCategory("Server.Queue");
try
{
::Common::DBHelper db;
db.init("DBWriter", props, log);
boost::posix_time::milliseconds delay(props.getInteger("Queue.WriteDelay"));
while (true)
{
boost::this_thread::interruption_point();
CoesMessage* msg;
while (queue_.dequeue(msg))
{
assert(msg && "Queue data seems like inconsistent");
if (msg->type() == MessageID::E)
{
Parser::EMessage& er = *(Parser::EMessage*)msg;
db.callPreparedStoredProc("CALL p_exe_report_LoadERepo(?,?,NOW(), ?,?,?,?,?,?,?,?)", er , fillEMessage);
}
}
LDebug(*log) << "queue is empty, wait for delay";
boost::this_thread::sleep(delay);
boost::this_thread::interruption_point();
}
}
catch (const std::exception& e)
{
LError(*log) << std::string("Queue initialization error: ").append(e.what());
}
};
ThreadPoolT::Config c = {
props.getInteger("Queue.Workers"),
doTask,
stopThread
};
return c; }())//worker_
{}
Примерно так я понял, что оно вообще делает :)
Исправление slackwarrior, :
LazyQueue::LazyQueue(const Utils::Properties& props)
: queue_(props.getInteger("DBWriter.Server.Queue.Size"))
, worker_(queue_,
[ this, &props ]() -> ThreadPoolT::Config
{
auto stopThread = [] (boost::thread& t) {
t.interrupt();
t.join();
};
auto fillEMessage = [ this ] (sql::PreparedStatement& stmt, const Parser::EMessage& er) {
stmt.setUInt (1, er.getExecID()); //in execId bigint(13),
...
stmt.setUInt (11, t.getSellerIsPrinted()); //in sIsPrint char(1)
};
auto doTask = [ this, &props, &fillEMessage ](){
boost::this_thread::interruption_point();
LogCategory* log = Log::instance()->createCategory("Server.Queue");
try
{
::Common::DBHelper db;
db.init("DBWriter", props, log);
boost::posix_time::milliseconds delay(props.getInteger("Queue.WriteDelay"));
while (true)
{
boost::this_thread::interruption_point();
CoesMessage* msg;
while (queue_.dequeue(msg))
{
assert(msg && "Queue data seems like inconsistent");
if (msg->type() == MessageID::E)
{
Parser::EMessage& er = *(Parser::EMessage*)msg;
db.callPreparedStoredProc("CALL p_exe_report_LoadERepo(?,?,NOW(), ?,?,?,?,?,?,?,?)", er , fillEMessage);
}
}
LDebug(*log) << "queue is empty, wait for delay";
boost::this_thread::sleep(delay);
boost::this_thread::interruption_point();
}
}
catch (const std::exception& e)
{
LError(*log) << std::string("Queue initialization error: ").append(e.what());
}
};
ThreadPoolT::Config c = {
props.getInteger("Queue.Workers"),
doTask,
stopThread
};
return c; }())//worker_
{}
Примерно так я понял, что оно вообще делает :)
Исходная версия slackwarrior, :
LazyQueue::LazyQueue(const Utils::Properties& props)
: queue_(props.getInteger("DBWriter.Server.Queue.Size"))
, worker_(queue_,
[ this, &props ]() -> ThreadPoolT::Config
{
auto stopThread = [] (boost::thread& t) {
t.interrupt();
t.join();
};
auto fillEMessage = [ this ] (sql::PreparedStatement& stmt, const Parser::EMessage& er) {
stmt.setUInt (1, er.getExecID()); //in execId bigint(13),
...
stmt.setUInt (11, t.getSellerIsPrinted()); //in sIsPrint char(1)
};
auto doTask = [ this, &props, &fillEMessage, &fillTMessage ](){
boost::this_thread::interruption_point();
LogCategory* log = Log::instance()->createCategory("Server.Queue");
try
{
::Common::DBHelper db;
db.init("DBWriter", props, log);
boost::posix_time::milliseconds delay(props.getInteger("Queue.WriteDelay"));
while (true)
{
boost::this_thread::interruption_point();
CoesMessage* msg;
while (queue_.dequeue(msg))
{
assert(msg && "Queue data seems like inconsistent");
if (msg->type() == MessageID::E)
{
Parser::EMessage& er = *(Parser::EMessage*)msg;
db.callPreparedStoredProc("CALL p_exe_report_LoadERepo(?,?,NOW(), ?,?,?,?,?,?,?,?)", er , fillEMessage);
}
}
LDebug(*log) << "queue is empty, wait for delay";
boost::this_thread::sleep(delay);
boost::this_thread::interruption_point();
}
}
catch (const std::exception& e)
{
LError(*log) << std::string("Queue initialization error: ").append(e.what());
}
};
ThreadPoolT::Config c = {
props.getInteger("Queue.Workers"),
doTask,
stopThread
};
return c; }())//worker_
{}
Примерно так я понял, что оно вообще делает :)