История изменений

Исправление Nervous, (текущая версия) :

Надо будет допилить да off-by-one ошибки повыловить

Fukk yeah

;; using multiple threads to read the file (divided to logical segments) and
;; compute stats for each segment, then combining the results

(def ^:dynamic *end-of-file*
  "Integer value returned by a file input stream when end of file is reached."  -1)

(def ^:dynamic *end-of-line*
  "Integer value that corresponds to the end of line character." 10)

(defn file-segment
  "Returns a logical segment of `file`, defined by start and end
  positions in bytes. The segment will begin at `start` and end after
  newline nearest to `start` + `initial-length` position (inclusive),
  i.e., segment always ends between lines (or at the end of file);
  `start` defaults to 0; `initial-length` defaults to the length of
  the entire file."
  ([^File file]
   (file-segment file 0))
  ([^File file start]
   (file-segment file start (.length file)))
  ([^File file start initial-length]
   (when (pos? initial-length)
     (with-open [file-input-stream (FileInputStream. file)]
       (let [file-channel         (.getChannel file-input-stream)
             end-of-line-or-file? #{*end-of-file* *end-of-line*}
             at-initial-end       (+ start initial-length)
             before-initial-end   (dec at-initial-end)]
         {:file  file
          :start start
          :end   (do
                   (.position file-channel ^int before-initial-end)
                   (let [initial-end-byte (.read file-input-stream)]
                     (if (end-of-line-or-file? initial-end-byte)
                       (min (.position file-channel) (.length file))
                         (.position file-channel ^int at-initial-end)
                         (while (not (end-of-line-or-file? (.read file-input-stream))))
                         (min (.position file-channel) (.length file))))))})))))

(defn file-segment-seq
  "Returns a lazy sequence of at most `n` successive logical segments of
  `file`. (As segments may be expanded to align with line boundaries,
  there may be fewer of them, especially if segments are small and
  lines are long.)"
  ([^ file n]
   (file-segment-seq file 0 n))
  ([^ file start n]
   (when (> n 0)
     (let [remaining-file-length  (- (.length file) start)
           initial-segment-length (int (/ remaining-file-length n))]
       (when-let [segment (file-segment file start initial-segment-length)]
         (cons segment (lazy-seq (file-segment-seq file (:end segment) (dec n)))))))))

(defn file-segment-line-seq
  "Returns a lazy sequence of lines from the `segment`. Reading begins
  at `start` and ends after `end`. (The line that contains the end
  position will be the last read. Segment constructor guarantees that
  segment will always end at a line boundary, but manually created
  segments may not.)"
  ([{:keys [file start end]}]
   (let [file-input-stream (FileInputStream. ^File file)
         _                 (.position (.getChannel file-input-stream) ^int start)
         reader            (io/reader file-input-stream)]
     (file-segment-line-seq reader 0 (- end start))))
  ([^BufferedReader reader bytes-read bytes-max]
   (when (not (>= bytes-read bytes-max))
     (when-let [line (.readLine reader)]
       (let [line-length-bytes (inc (count (.getBytes line)))]
         (cons line (lazy-seq (file-segment-line-seq reader
                                                     (+ bytes-read line-length-bytes)

(defn merge-station-stats
  "Returns two station stats maps merged."
  [{cnt1 :count min1 :minimum max1 :maximum avg1 :average}
   {cnt2 :count min2 :minimum max2 :maximum avg2 :average}]
  {:count   (+ cnt1 cnt2)
   :minimum (min min1 min2)
   :maximum (max max1 max2)
   :average (/ (+ (* cnt1 avg1) (* cnt2 avg2)) (+ cnt1 cnt2))})

(defn parallel-stats
  "Returns the map of station stats keyed by station names. Reads the
  measurements data from `filename`, that is a text file containing
  semicolon-separated values. Uses multiple threads to read the
  file (divided to `n` logical segments) and computes stats for each
  segment, then combines the results. Default number of segments is the
  number of available cores + 2, same as the number of threads used by
   (parallel-stats filename (+ 2 (.. Runtime getRuntime availableProcessors))))
  ([filename n]
   (let [file (io/file filename)]
     (reduce #(merge-with merge-station-stats %1 %2)
             (pmap (fn [segment]
                     (transducing-stats (file-segment-line-seq segment)))
                   (file-segment-seq file n))))))


  ;; 80% better than baseline (4.5 min vs 22 min), with 4 physical (8 logical) cores (Ryzen 5 3400G)

  ;; 1e6 lines: 0.3 sec
  (time (format-stats (parallel-stats "dev/resources/measurements1M.txt")))

  ;; 1e9 lines: 263 sec (~4.5 min)
  (time (format-stats (parallel-stats "dev/resources/measurements.txt")))


