LINUX.ORG.RU

планировщик типа cron

 ,


1

2

Интересно как бы выглядел периодический планировщик аля примитивный cron на ML-подобных ЯП?

Задача: с заданной перидичностью запускать какие-то действия. Причём у каждого действия своя переодичность выполнения.

На питоне я сделал так (можете сделать по-своему): Задачи складываются в очередь «ожидания» с приоритетом, где приоритет есть unix timestamp когда оно должно запуститься. Более срочные задачи в очереди стоят первыми.

Из этой очереди мы потихоньку достаём задачи по одной и ждём нужное время, потом перекладываем в очередь на выполнение где задачи выполняются воркерами. Отработавший воркер опять кладёт задачу в очередь ожидания. Т.е. задачи циркулируют как бы по кругу, перекладываясь между очередями.

На случай если новая задача должна быть выполнена быстрее всех мы обрываем текущий таймер ожидания и расчитываем новое время которое нужно спать . Т.е. рестарт таймера (самый дурацкий момент в алгоритме).

Тут есть нюанс: задача планируется только по завершению обработки. Т.е. не получится так что новый процесс проверки запуститься до того как завершаться старые.

★★★★★

например так: если задач не 999999999

import Control.Concurrent.Async
import System.Process

addTask :: Int -> String -> IO ()
addTask d t = async $ threadDelay d >> runCommand t >> addTask d t

main = do
  addTask 1000 "echo \"foo\""
  ...

qnikst ★★★★★
()
Последнее исправление: qnikst (всего исправлений: 1)

На питоне я сделал так (можете сделать по-своему): Задачи складываются в очередь «ожидания» с приоритетом, где приоритет есть unix timestamp когда оно должно запуститься. Более срочные задачи в очереди стоят первыми.

это умеет делать RTS причем весьма эффективно до тех пор пока задач не слишком много, но к тому времени когда ты в это упрёшься ты успеешь написать более аккуратное решение.

qnikst ★★★★★
()
Ответ на: комментарий от qnikst

другой вариант ghc зависимый:

GHC.Event.registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
тут EventManager это созданный менеджер событий, TimeoutCallback просто действие, а по TimeoutKey можно изменять задержку.

Внутри честная priority-queue.

qnikst ★★★★★
()
Ответ на: комментарий от true_admin

ммм.. например так:

main = go <$> newTChanIO
  where 
    go ch = do
       _ <- replicateM n $ forkIO worker
       addTask 1000 "fooo"    
     where
       addTask delay t = async $ do
          threadDelay delay
          atomically $ writeTChan ch (delay,t)
       worker = do
          (delay, t) <- atomicallty (readTChan ch)
          run t
          addTask delay t
       run = undefined
qnikst ★★★★★
()
Последнее исправление: qnikst (всего исправлений: 4)
Ответ на: комментарий от true_admin

в принципе твой вариант на STM тоже хорошо ложится:

newtype TimedTask a = TT { unTT :: (UTCTime, a)}

instance Ord a where
  compare = compare `on` (fst . unTT)

-- подойдет любая очередь приоритетов
class Heap where
   deleteMin :: Heap a -> Heap a
   findMin :: Ord a => Heap a -> Maybe a
   insert  :: Ord a => a -> Heap a -> Heap a

worker first storage mdelay = do
  ctime <- getCurrentTime
  maybe (newTVarIO False) (registerDelay) mdelay
  ret <- atomically $ 
      (do 
         (tm,task) <- takeTMVar first
         if tm < ctime 
           then do
               Foldable.forM_ (putTMVar first) =<< findMin <$> readTVar storage  
               modifyTVar storage deleteMin 
               return $ Right task
            else return $ Left (tm `diffTime` ctime))
      `orElse` (readTVar delay >>= flip unless retry)        
   case ret of
     Right task -> do 
       run task 
       addAgain task
       worker first loop Nothing
     Left d -> worker fist storage (Just {- тут тип нужно привести -} d)

и создавай сколько нужно воркеров, добавление задачи будет так:

addTask first storage d t = do 
   c <- getCurrectTime
   let c' = c `addTime` d --на память не помню но в UTCTime все методы нужные есть
   atomically $ do
      f@(t, _) <- readTMVar first
      if t < c' 
        then modifyTVar storage (insert (d,t))
        else do
           modifyTVar storage (insert f)
           swapTMVar first (d,t)

я почти сплю так что с десяток очепяток тут есть, но смысл такой.

qnikst ★★★★★
()
Последнее исправление: qnikst (всего исправлений: 5)

как бы выглядел ... ML-подобных ЯП?

Не могу удержаться, чтобы не сказать «нечитаемо». Примеры в треде подтвердили это.

buddhist ★★★★★
()
Ответ на: комментарий от buddhist

Приведи пример читаемого, пожалуйста.

anonymous
()
Ответ на: комментарий от qnikst

Я вот подумал что вариант «async $ threadDelay d >> runCommand t >> addTask d t» мне нравится больше всего. Наверно, можно тупо добавить сюда семафор для ограничения конкурентности, не?

true_admin ★★★★★
() автор топика
Ответ на: комментарий от true_admin

не-не-не. в этом варианте смысл в том, что ты загружаешь легкий поток на задачу, и тебе необходимо запустить по потоку на задачу, так что это останется.

Если ты хочешь ограничить кол-во конкурентно выполняющихся задач, то тебе нужна точка синхронизации, самое простое это использовать каналы (STM или IO), я сразу использовал STM.

qnikst ★★★★★
()
Ответ на: комментарий от buddhist

в тред призываются решения задачи на лиспе:

а). любой способ без ограничения кол-ва воркеров

б). любой способ, но с ограничением кол-ва задач

в). очередь с аналогом priority queue и возможностью добавлять новые задачи

qnikst ★★★★★
()
Ответ на: комментарий от qnikst

не-не-не. в этом варианте смысл в том, что ты загружаешь легкий поток на задачу, и тебе необходимо запустить по потоку на задачу, так что это останется.

Я понимаю :). Я просто хотел сказать что в фазе ожидания пусть они все работают параллельно. А вот когда пришло время что-то запускать пусть пытаются получить семафор. Или это не ложится на хаскель?

true_admin ★★★★★
() автор топика
Ответ на: комментарий от qnikst

Всё, теперь понял, спасибо. Я голосую за вариант с семафором. Не знаю на сколько это «функционально», но выглядит лаконично и понятно.

true_admin ★★★★★
() автор топика
Ответ на: комментарий от true_admin

не знаю, философией кода не сильно увлекаюсь. А решение с семафором и в правду оказалось самым простым и адекватным, но если потребуется расширение операций, например, процесс не должен задерживаться более, чем на T+dt, то нужно будет переключиться на STM семафоры.

qnikst ★★★★★
()
Ответ на: комментарий от true_admin

не в ту сторону. Сейчас выполняется максимум n процессов, и следующий готовый будет ждать пока не выполниться хотя бы один из них, при этом ждать он может сколь угодно долго. Вполне возможная проблема, то что мы не можем позволять ему ждать больше, чем какой-то лимит времени, в этом случае если лимит почти превышен, то мы забиваем на ограничение и запускаем его.

qnikst ★★★★★
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.