LINUX.ORG.RU

1 биллион челлендж

 ,


2

4

Даётся CSV файл с температурой от метеостанции и названием локации. Таких записей миллиард. Нужно найти максимальную, минимальную и среднюю температуру по каждой локации за минимальное время. Подробнее https://www.morling.dev/blog/one-billion-row-challenge/

  • Срок до 31 января

  • Пишем на джаве (но там вроде и другие ЯП участвовали)

  • Приз имя на доске почёта

Предоставленные реализации на текущий момент https://github.com/gunnarmorling/1brc?tab=readme-ov-file#results

Реализации и челленджи на других ЯП https://github.com/gunnarmorling/1brc/discussions/categories/show-and-tell

★★★★★

Последнее исправление: foror (всего исправлений: 1)
Ответ на: комментарий от Nervous

натянуть мутабельность на жадную итерацию

Действительно слегка быстрее.

;; using manual looping and mutable hashmaps

(defn looping-stats-mutable!
  "Returns the map of station stats keyed by station names.
  Reads a semicolon-separated file with measurement records eagerly
  line by line, parses each line into a measurement (station name and
  measured value) and updates the corresponding station's stats using
  the value. Uses mutable hashmaps to store total and station stats."
  [filename]
  (let [rdr (io/reader filename)]
    (loop [stats (java.util.HashMap.)]
      (if-let [line (.readLine ^java.io.BufferedReader rdr)]
        (let [[key string-value] (s/split line scsv-split-regex)
              value              (parse-double string-value)
              {:keys [count minimum maximum average] :as key-stats}
              (or (.get ^java.util.HashMap stats key)
                  (doto (java.util.HashMap.)
                    (.put :count 0)
                    (.put :maximum value)
                    (.put :minimum value)
                    (.put :average value)))
              updated-key-stats
              (doto ^java.util.HashMap key-stats
                (.put :count (inc count))
                (.put :minimum (min minimum value))
                (.put :maximum (max maximum value))
                (.put :average (moving-average average count value)))]
          (recur (doto stats (.put key updated-key-stats))))
        stats))))

(comment

  ;; ~50% better than baseline (and ~30% better than the transducing variant)

  ;; ~0.7 sec
  (time (format-stats (looping-stats-mutable! "dev/resources/measurements1M.txt")))

  ;; ~750 sec (~12 min)
  (time (format-stats (looping-stats-mutable! "dev/resources/measurements.txt")))

)

Дальше, видимо, уже только изобретать кастомные хэшмапы и читать файл по кускам в несколько потоков.

Nervous ★★★★★
()
Последнее исправление: Nervous (всего исправлений: 2)
Ответ на: комментарий от urxvt

Большую долю прироста дает отказ от String как такого.

Похоже на то. Когда я попытался полностью прочитать 14-гигабайтный файл в память (в последовательность строк), 64 Гб памяти жабе не хватило.

Среднее можно не считать на каждой итерации.

Но тогда ведь придётся хранить всю последовательность измеренных значений для каждой станции в памяти, чтобы посчитать среднее в конце? Ах вот зачем им 128 Гб памяти понадобилось %)

Мне профайлер сказал, что вычисление скользящего среднего от силы пару процентов времени занимает — похоже, там много не наэкономишь.

Nervous ★★★★★
()
Последнее исправление: Nervous (всего исправлений: 4)
Ответ на: комментарий от Nervous

Но тогда ведь придётся хранить всю последовательность измеренных значений для каждой станции в памяти, чтобы посчитать среднее в конце? Ах вот зачем им 128 Гб памяти понадобилось %)

Хранишь сумму и количество, среднее считаешь при выводе результатов.

Мне профайлер сказал, что вычисление скользящего среднего от силы пару процентов времени занимает — похоже, там много не наэкономишь.

Может и так.

urxvt ★★★★★
()
Ответ на: комментарий от Nervous

Страсти-то какие. Это же не наш метод! Я предлагаю зайти с другой стороны:

  1. Не превращать наш прекрасный лиспик в убогое подобие С++. И элегантность растеряем и по скорости всё равно отстанем.
  2. По возможности не напрягаться и использовать стандартные компоненты.
  3. Просто распаралелить по данным, а каждый поток данных считать последовательно. Потом слить результаты.
  4. Читать файл кусочками, искать там переводы строк и следить, чтобы нигде не продолбаться с индексами уныло и скучно. Но всеблагой Господь послал нам утилиту split, которая заранее нам разрежет исходный большой файл на тысячу маленьких. После чего
(ns ubrc.simple
  (:require
   [clojure.string :as str]))

(defn merge-two-stations [station1 station2]
  {:min (min (:min station1) (:min station2))
   :max (max (:max station1) (:max station2))
   :n (+ (:n station1) (:n station2))
   :t (/ (+ (* (:n station1) (:t station1)) (* (:n station2) (:t station2)))
         (+ (:n station1) (:n station2)))})

(defn update-station [station temperature]
  (let [n (:n station)
        n1 (inc n)
        average-temperature (:t station)
        new-average (/ (+ temperature (* n average-temperature)) n1)]
    {:min (min (:min station) temperature)
     :max (max (:max station) temperature)
     :n n1 :t new-average}))

(defn new-station [temperature]
  {:min temperature
   :max temperature
   :n 1 :t temperature})

(defn get-new-station [station temperature]
  (if station (update-station station temperature)
               (new-station temperature)))

;; stations example    -- {:Test {:min -4, :max 32, :n 2, :t 2.2} :Milan {:min 2, :max 22, :n 1, :t 8.7}}
;; new-station example -- {:Test 3.3}
(defn add-station [stations new-station]
  (let [name (first (keys new-station))
        temperature (get new-station name)
        station (get stations name)
        new-station (get-new-station station temperature)]
    (assoc stations name new-station)))

(defn parse-station [station]
  (let [arr (str/split station #";")
        name (first arr)
        t (read-string (second arr))]
    {(keyword name) t}))

(defn read-file [file-name]
  (str/split (slurp file-name) #"\n"))

(defn iter [file-name]
  (let [seq (read-file file-name)]
    (reduce add-station {} (map parse-station seq))))

(defn run [files]
  (let [dbs (pmap iter files)]
    (reduce #(merge-with merge-two-stations %1 %2)
            dbs)))

(print-result (simple/run dataFiles))

Результат выходит на уровне

java -jar /home/ig/tmp/ubrc/target/uberjar/ubrc-0.1.0-SNAPSHOT-standalone.jar  4887.45s user 77.95s system 933% cpu 8:52.01 total
ugoday ★★★★★
()
Ответ на: комментарий от Nervous

64 Гб памяти жабе не хватило.

Я не удивлён. Если я ничего не путаю - стринги в джаве юникодные, не (даже без учёта других накладных расходов, коих предостаточно)?

посчитать среднее в конце?

На таких объемах теоретически могут вылезать интересные эффекты связанные с потерей точности при добавлении маленьких floats к большим накопленным суммам. Существуют известные способы как с этим бороться, но условный аналог EMA в нулевом приближении - не так плох, кмк.

bugfixer ★★★★★
()
Последнее исправление: bugfixer (всего исправлений: 1)
Ответ на: комментарий от ugoday

Господь послал нам утилиту split, которая заранее нам разрежет исходный большой файл на тысячу маленьких.

Даже если отбросить моё отношение к конкретно этой идее как довольно маразматичной (по многим причинам), мы так можем договориться до того что весь math будет двинут в условный prep-stage, и тулзня сведётся к «cat results.txt». И выше я уже озвучивал что как по мне так даже многократное чтение при benchmarking (не говоря даже о предварительном двигании данных на более быстрый носитель, типа tmpfs как предлагали до этого) - уже чит.

bugfixer ★★★★★
()
Последнее исправление: bugfixer (всего исправлений: 1)
Ответ на: комментарий от ugoday

распаралелить по данным

Мне тоже думается, что pmap/reduce тут должен довольно сильно помочь, но предварительный сплит вроде как слегка не по правилам получается. Вот если бы найти способ на лету быстро разбить файл на эн сегментов (да ещё и с учётом строк, чтобы границы сегментов не оказывались посреди строки)…

[seq (str/split (slurp file-name) #«\n»)]

Может ведь получиться, что весь 14-гигабайтный файл будет в какой-то момент времени находиться в памяти? По идее, (line-seq (clojure.java.io/reader file-name)) должно быть помилосерднее по потреблению памяти — а может, и по скорости.

Nervous ★★★★★
()
Ответ на: комментарий от Nervous

да ещё и с учётом строк, чтобы границы сегментов не оказывались посреди строки

Там на Go вариант у товарища.
https://github.com/jkroepke/1brc-go/tree/main/go

Он там пилит куски по потокам как-то так:

	    data = data[workerSize*workerID : last]
	    data = data[bytes.IndexByte(data, '\n')+1 : bytes.LastIndexByte(data, '\n')+1]

а ещё из забавного - он там предполагает, что все возможные названия станций есть в первых пяти миллионах записей. т.е. сразу строит готовый массив станций.

Вполне прилично получается. В районе тех же 3 секунд на прокаченном в кэш файле и около 6 секунд после echo 3 > /proc/sys/vm/drop_caches

Toxo2 ★★★★
()
Ответ на: комментарий от Toxo2

Там на Go вариант у товарища.
https://github.com/jkroepke/1brc-go/tree/main/go

Обычно, для определения размера сегмента там делают:

while (segment[size - 1] != '\n') {
    size--;
}

а ещё из забавного - он там предполагает, что все возможные названия станций есть в первых пяти миллионах записей. т.е. сразу строит готовый массив станций.

Тем не менее, это нарушает условия конкурса.

urxvt ★★★★★
()
Ответ на: комментарий от urxvt

это нарушает условия конкурса.

Так всё, что не на Java, сразу нарушает условия конкурса по определению.

Забавно же смотреть какие штуки народ придумывает всё равно )

Toxo2 ★★★★
()
Ответ на: комментарий от Nervous

предварительный сплит вроде как слегка не по правилам получается. Вот если бы найти способ на лету быстро разбить файл на эн сегментов

Безусловно. Мы же можем читать файл в произвольном месте. Так что можем сделать первый проход, в котором прыгать в середину файла и искать ближайший конец строки, после чего возвращать размеченные области для паралельной обработки. Но даже словами это писать дольше, нежели выполнить split -l N BigFile.txt.

Вообще, я сосредоточен на том, чтобы минимальными усилиями получить максимальный результат. Поэтому нужно найти грань, когда ты перестаёшь пользоваться языком и начинаешь бороться с ним. Собственно, мне интересно нащупать эту грань, а цифры и их «честность» безразличны, всё равно я ни с кем не соревнуюсь.

(line-seq (clojure.java.io/reader file-name))

Имеет смысл попробовать, да.

ugoday ★★★★★
()
Ответ на: комментарий от ugoday

минимальными усилиями получить максимальный результат

Согласен. Но если хотим сравнивать с другими решениями, то время на сплит надо тоже включить в общее время работы.

цифры и их «честность» безразличны, всё равно я ни с кем не соревнуюсь

А кто тут соревнуется, всё просто развлечения ради. С этой вот фигнёй с разбиением файла на сегменты тоже любопытно поиграться.

В итоге, конечно, хотелось бы, чтобы решение было более-менее конкурентоспособно по скорости и уделывало всех по понятности и поддерживаемости — чтобы его было легко изменить и доработать по новым требованиям. Получится — хорошо, не получится — и ладно %)

Nervous ★★★★★
()
Последнее исправление: Nervous (всего исправлений: 3)
Ответ на: комментарий от Nervous

время на сплит надо тоже включить в общее время работы.

Непонятно как учесть, что split кроме нахождения границ разбиения ещё и создаёт тысячу файлов по 14Мб. Проще использовать готовый набор файлов и сравнивать разные подходы при работе с ними. Вот, к слову, решил реализовать тот же алгоритм на SBCL

(ql:quickload "uiop")
(ql:quickload :parse-float)
(ql:quickload "alexandria")
(ql:quickload "lparallel")

(defpackage :blc-challenge
  (:use :cl :lparallel :lparallel.queue)
  (:export #:main))

(in-package :blc-challenge)

(defun init ()
  (setf *kernel* (make-kernel 8 :name "channel-queue-kernel")))

(init)


(declaim (optimize (speed 3) (safety 0)))

(defparameter dataDir "~/src/github.com/gunnarmorling/1brc/data")
(defparameter dataFiles (remove-if-not #'(lambda (path) (uiop:string-prefix-p "x" (file-namestring path))) (uiop:directory-files dataDir)))

(defstruct station
  (min 0.0 :type single-float)
  (max 0.0 :type single-float)
  (n 1 :type fixnum)
  (avg 0.0 :type single-float))

(defun parse-station (l)
  (let ((station (uiop:split-string l :max 2 :separator ";")))
    (cons (intern (car station))
	  (parse-float:parse-float (cadr station)))))

(defun new-avg (temperature n average-temperature)
  (/ (+ temperature (* n average-temperature)) (1+ n)))

(defun updated-station (station temperature)
  (make-station :min (min temperature (station-min station))
		:max (max temperature (station-max station))
		:n (1+ (station-n station))
		:avg (new-avg temperature (station-n station) (station-avg station))))


(defun add-station (db station)
  (let* ((station-name (car station))
	 (temperature (cdr station))
	 (old-station (gethash station-name db)))
    (setf (gethash station-name db)
	  (if old-station (updated-station old-station temperature)
	      (make-station :min temperature
			    :max temperature
			    :n 1
			    :avg temperature)))))

(defun iter (file-name)
  (let ((db (make-hash-table :size 413)))
    (with-open-file (in file-name)
	    (loop for l = (read-line in nil nil)
		  while l
		  do (add-station db (parse-station l))))
    db))

(defun merge-two-stations (db1 db2)
  (maphash #'(lambda (name station2)
		    (let* ((station1 (gethash name db1))
			   (new-station
			     (make-station :min (min (station-min station1)
						     (station-min station2))
					   :max (max (station-max station1)
						     (station-max station2))
					   :n (+ (station-n station1) (station-n station2))
					   :avg (/ (+ (* (station-n station1) (station-avg station1))
						      (* (station-n station2) (station-avg station2)))
						   (+ (station-n station1) (station-n station2))))))
		      (setf (gethash name db1) new-station)))
	   db2)
  db1)

(defun merge-stations (stations)
  (reduce #'merge-two-stations stations))

(defun print-station (name station)
  (format nil "~a=~,1F/~,1F/~,1F"
	  name (station-min station) (station-avg station) (station-max station)))

(defun print-stations (stations)
  (let* ((names (sort
		(mapcar #'string
			(alexandria:hash-table-keys stations))
		#'string-lessp))
	 (statistics (loop for name in names
			   collect (print-station name (gethash (intern name "COMMON-LISP-USER") stations)))))
    (format t "{~{~A~^ ~}}~%" statistics)))

(defun run (files)
  (merge-stations (pmapcar #'iter files)))

(defun main ()
  (print-stations (run dataFiles)))

(time (main))
;; лишние строки не показаны
Evaluation took:
  219.599 seconds of real time
  1690.001351 seconds of total run time (1675.769479 user, 14.231872 system)
  [ Real times consist of 5.983 seconds GC time, and 213.616 seconds non-GC time. ]
  [ Run times consist of 5.861 seconds GC time, and 1684.141 seconds non-GC time. ]
  769.59% CPU
  569,191,774,868 processor cycles
  51 page faults
  349,112,268,000 bytes consed

Три с половиной минуты против почти 9 у Clojure. Может надо с JVM как-то пошаманить?

ugoday ★★★★★
()
Ответ на: комментарий от ugoday

Три с половиной минуты против почти 9 у Clojure. Может надо с JVM как-то пошаманить?

Я тут слепил на коленке разбиение файла на эн логических сегментов (без физического деления на несколько файлов) и скармливание их pmapу для обработки (пока без объединения результатов). Ревело кулером, нагрело проц до 72 градусов, зато отработало за 4.5 минуты %)

Надо будет допилить да off-by-one ошибки повыловить.

Это без всяких мутабельных ухищрений и жадных итераций — иммутабельные мапы, ленивые последовательности, всё как мы любим.

Nervous ★★★★★
()
Последнее исправление: Nervous (всего исправлений: 2)
Ответ на: комментарий от Nervous

Надо будет допилить да off-by-one ошибки повыловить

Fukk yeah

;; using multiple threads to read the file (divided to logical segments) and
;; compute stats for each segment, then combining the results

(def ^:dynamic *end-of-file*
  "Integer value returned by a file input stream when end of file is reached."  -1)

(def ^:dynamic *end-of-line*
  "Integer value that corresponds to the end of line character." 10)

(defn file-segment
  "Returns a logical segment of `file`, defined by start and end
  positions in bytes. The segment will begin at `start` and end after
  newline nearest to `start` + `initial-length` position (inclusive),
  i.e., segment always ends between lines (or at the end of file);
  `start` defaults to 0; `initial-length` defaults to the length of
  the entire file."
  ([^File file]
   (file-segment file 0))
  ([^File file start]
   (file-segment file start (.length file)))
  ([^File file start initial-length]
   (when (pos? initial-length)
     (with-open [file-input-stream (FileInputStream. file)]
       (let [file-channel         (.getChannel file-input-stream)
             end-of-line-or-file? #{*end-of-file* *end-of-line*}
             at-initial-end       (+ start initial-length)
             before-initial-end   (dec at-initial-end)]
         {:file  file
          :start start
          :end   (do
                   (.position file-channel ^int before-initial-end)
                   (let [initial-end-byte (.read file-input-stream)]
                     (if (end-of-line-or-file? initial-end-byte)
                       (min (.position file-channel) (.length file))
                       (do
                         (.position file-channel ^int at-initial-end)
                         (while (not (end-of-line-or-file? (.read file-input-stream))))
                         (min (.position file-channel) (.length file))))))})))))

(defn file-segment-seq
  "Returns a lazy sequence of at most `n` successive logical segments of
  `file`. (As segments may be expanded to align with line boundaries,
  there may be fewer of them, especially if segments are small and
  lines are long.)"
  ([^java.io.File file n]
   (file-segment-seq file 0 n))
  ([^java.io.File file start n]
   (when (> n 0)
     (let [remaining-file-length  (- (.length file) start)
           initial-segment-length (int (/ remaining-file-length n))]
       (when-let [segment (file-segment file start initial-segment-length)]
         (cons segment (lazy-seq (file-segment-seq file (:end segment) (dec n)))))))))

(defn file-segment-line-seq
  "Returns a lazy sequence of lines from the `segment`. Reading begins
  at `start` and ends after `end`. (The line that contains the end
  position will be the last read. Segment constructor guarantees that
  segment will always end at a line boundary, but manually created
  segments may not.)"
  ([{:keys [file start end]}]
   (let [file-input-stream (FileInputStream. ^File file)
         _                 (.position (.getChannel file-input-stream) ^int start)
         reader            (io/reader file-input-stream)]
     (file-segment-line-seq reader 0 (- end start))))
  ([^BufferedReader reader bytes-read bytes-max]
   (when (not (>= bytes-read bytes-max))
     (when-let [line (.readLine reader)]
       (let [line-length-bytes (inc (count (.getBytes line)))]
         (cons line (lazy-seq (file-segment-line-seq reader
                                                     (+ bytes-read line-length-bytes)
                                                     bytes-max))))))))

(defn merge-station-stats
  "Returns two station stats maps merged."
  [{cnt1 :count min1 :minimum max1 :maximum avg1 :average}
   {cnt2 :count min2 :minimum max2 :maximum avg2 :average}]
  {:count   (+ cnt1 cnt2)
   :minimum (min min1 min2)
   :maximum (max max1 max2)
   :average (/ (+ (* cnt1 avg1) (* cnt2 avg2)) (+ cnt1 cnt2))})

(defn parallel-stats
  "Returns the map of station stats keyed by station names. Reads the
  measurements data from `filename`, that is a text file containing
  semicolon-separated values. Uses multiple threads to read the
  file (divided to `n` logical segments) and computes stats for each
  segment, then combines the results. Default number of segments is the
  number of available cores + 2, same as the number of threads used by
  `pmap`."
  ([filename]
   (parallel-stats filename (+ 2 (.. Runtime getRuntime availableProcessors))))
  ([filename n]
   (let [file (io/file filename)]
     (reduce #(merge-with merge-station-stats %1 %2)
             (pmap (fn [segment]
                     (transducing-stats (file-segment-line-seq segment)))
                   (file-segment-seq file n))))))

(comment

  ;; 80% better than baseline (4.5 min vs 22 min), with 4 physical (8 logical) cores (Ryzen 5 3400G)

  ;; 1e6 lines: 0.3 sec
  (time (format-stats (parallel-stats "dev/resources/measurements1M.txt")))

  ;; 1e9 lines: 263 sec (~4.5 min)
  (time (format-stats (parallel-stats "dev/resources/measurements.txt")))

)
Nervous ★★★★★
()
Последнее исправление: Nervous (всего исправлений: 8)