LINUX.ORG.RU

История изменений

Исправление slackwarrior, (текущая версия) :

LazyQueue::LazyQueue(const Utils::Properties& props)
:    queue_(props.getInteger("Queue.Size"))
,    worker_(queue_, 
    [ this, &props ]() -> ThreadPoolT::Config     
    { 
        auto stopThread = [] (boost::thread& t) { 
            t.interrupt();
            t.join();
        };

        auto fillEMessage = [ this ] (sql::PreparedStatement& stmt, const Parser::EMessage& er) {
                                            stmt.setUInt    (1,        er.getExecID());    //in execId bigint(13),
                                            ...
                                            stmt.setUInt    (11,    t.getSellerIsPrinted());        //in sIsPrint char(1)
                                    };

        auto doTask = [ this, &props, &fillEMessage ](){ 
            boost::this_thread::interruption_point();

            LogCategory* log = Log::instance()->createCategory("Server.Queue");
            try
            {
                ::Common::DBHelper db;
                db.init("DBWriter", props, log); 

                boost::posix_time::milliseconds delay(props.getInteger("Queue.WriteDelay"));

                while (true)
                {
                    boost::this_thread::interruption_point();
                    CoesMessage* msg;

                    while (queue_.dequeue(msg))
                    {
                            assert(msg && "Queue data seems like inconsistent");
                            if (msg->type() == MessageID::E)
                            {
                                Parser::EMessage& er = *(Parser::EMessage*)msg;
                                db.callPreparedStoredProc("CALL p_exe_report_LoadERepo(?,?,NOW(), ?,?,?,?,?,?,?,?)", er , fillEMessage);
                            }        
                    }
					LDebug(*log) << "queue is empty, wait for delay";
                    boost::this_thread::sleep(delay);
                    boost::this_thread::interruption_point();
                }
            }
            catch (const std::exception& e)
            {
                LError(*log) << std::string("Queue initialization error: ").append(e.what());
            }
        };
        ThreadPoolT::Config c = { 
            props.getInteger("Queue.Workers"),
            doTask,
            stopThread
        }; 
return c; }())//worker_
{}

Примерно так я понял, что оно вообще делает :)

Исправление slackwarrior, :

LazyQueue::LazyQueue(const Utils::Properties& props)
:    queue_(props.getInteger("DBWriter.Server.Queue.Size"))
,    worker_(queue_, 
    [ this, &props ]() -> ThreadPoolT::Config     
    { 
        auto stopThread = [] (boost::thread& t) { 
            t.interrupt();
            t.join();
        };

        auto fillEMessage = [ this ] (sql::PreparedStatement& stmt, const Parser::EMessage& er) {
                                            stmt.setUInt    (1,        er.getExecID());    //in execId bigint(13),
                                            ...
                                            stmt.setUInt    (11,    t.getSellerIsPrinted());        //in sIsPrint char(1)
                                    };

        auto doTask = [ this, &props, &fillEMessage ](){ 
            boost::this_thread::interruption_point();

            LogCategory* log = Log::instance()->createCategory("Server.Queue");
            try
            {
                ::Common::DBHelper db;
                db.init("DBWriter", props, log); 

                boost::posix_time::milliseconds delay(props.getInteger("Queue.WriteDelay"));

                while (true)
                {
                    boost::this_thread::interruption_point();
                    CoesMessage* msg;

                    while (queue_.dequeue(msg))
                    {
                            assert(msg && "Queue data seems like inconsistent");
                            if (msg->type() == MessageID::E)
                            {
                                Parser::EMessage& er = *(Parser::EMessage*)msg;
                                db.callPreparedStoredProc("CALL p_exe_report_LoadERepo(?,?,NOW(), ?,?,?,?,?,?,?,?)", er , fillEMessage);
                            }        
                    }
					LDebug(*log) << "queue is empty, wait for delay";
                    boost::this_thread::sleep(delay);
                    boost::this_thread::interruption_point();
                }
            }
            catch (const std::exception& e)
            {
                LError(*log) << std::string("Queue initialization error: ").append(e.what());
            }
        };
        ThreadPoolT::Config c = { 
            props.getInteger("Queue.Workers"),
            doTask,
            stopThread
        }; 
return c; }())//worker_
{}

Примерно так я понял, что оно вообще делает :)

Исходная версия slackwarrior, :

LazyQueue::LazyQueue(const Utils::Properties& props)
:    queue_(props.getInteger("DBWriter.Server.Queue.Size"))
,    worker_(queue_, 
    [ this, &props ]() -> ThreadPoolT::Config     
    { 
        auto stopThread = [] (boost::thread& t) { 
            t.interrupt();
            t.join();
        };

        auto fillEMessage = [ this ] (sql::PreparedStatement& stmt, const Parser::EMessage& er) {
                                            stmt.setUInt    (1,        er.getExecID());    //in execId bigint(13),
                                            ...
                                            stmt.setUInt    (11,    t.getSellerIsPrinted());        //in sIsPrint char(1)
                                    };

        auto doTask = [ this, &props, &fillEMessage, &fillTMessage ](){ 
            boost::this_thread::interruption_point();

            LogCategory* log = Log::instance()->createCategory("Server.Queue");
            try
            {
                ::Common::DBHelper db;
                db.init("DBWriter", props, log); 

                boost::posix_time::milliseconds delay(props.getInteger("Queue.WriteDelay"));

                while (true)
                {
                    boost::this_thread::interruption_point();
                    CoesMessage* msg;

                    while (queue_.dequeue(msg))
                    {
                            assert(msg && "Queue data seems like inconsistent");
                            if (msg->type() == MessageID::E)
                            {
                                Parser::EMessage& er = *(Parser::EMessage*)msg;
                                db.callPreparedStoredProc("CALL p_exe_report_LoadERepo(?,?,NOW(), ?,?,?,?,?,?,?,?)", er , fillEMessage);
                            }        
                    }
					LDebug(*log) << "queue is empty, wait for delay";
                    boost::this_thread::sleep(delay);
                    boost::this_thread::interruption_point();
                }
            }
            catch (const std::exception& e)
            {
                LError(*log) << std::string("Queue initialization error: ").append(e.what());
            }
        };
        ThreadPoolT::Config c = { 
            props.getInteger("Queue.Workers"),
            doTask,
            stopThread
        }; 
return c; }())//worker_
{}

Примерно так я понял, что оно вообще делает :)