История изменений
Исправление Nervous, (текущая версия) :
Надо будет допилить да off-by-one ошибки повыловить
;; using multiple threads to read the file (divided to logical segments) and
;; compute stats for each segment, then combining the results
(def ^:dynamic *end-of-file*
"Integer value returned by a file input stream when end of file is reached." -1)
(def ^:dynamic *end-of-line*
"Integer value that corresponds to the end of line character." 10)
(defn file-segment
"Returns a logical segment of `file`, defined by start and end
positions in bytes. The segment will begin at `start` and end after
newline nearest to `start` + `initial-length` position (inclusive),
i.e., segment always ends between lines (or at the end of file);
`start` defaults to 0; `initial-length` defaults to the length of
the entire file."
([^File file]
(file-segment file 0))
([^File file start]
(file-segment file start (.length file)))
([^File file start initial-length]
(when (pos? initial-length)
(with-open [file-input-stream (FileInputStream. file)]
(let [file-channel (.getChannel file-input-stream)
end-of-line-or-file? #{*end-of-file* *end-of-line*}
at-initial-end (+ start initial-length)
before-initial-end (dec at-initial-end)]
{:file file
:start start
:end (do
(.position file-channel ^int before-initial-end)
(let [initial-end-byte (.read file-input-stream)]
(if (end-of-line-or-file? initial-end-byte)
(min (.position file-channel) (.length file))
(do
(.position file-channel ^int at-initial-end)
(while (not (end-of-line-or-file? (.read file-input-stream))))
(min (.position file-channel) (.length file))))))})))))
(defn file-segment-seq
"Returns a lazy sequence of at most `n` successive logical segments of
`file`. (As segments may be expanded to align with line boundaries,
there may be fewer of them, especially if segments are small and
lines are long.)"
([^java.io.File file n]
(file-segment-seq file 0 n))
([^java.io.File file start n]
(when (> n 0)
(let [remaining-file-length (- (.length file) start)
initial-segment-length (int (/ remaining-file-length n))]
(when-let [segment (file-segment file start initial-segment-length)]
(cons segment (lazy-seq (file-segment-seq file (:end segment) (dec n)))))))))
(defn file-segment-line-seq
"Returns a lazy sequence of lines from the `segment`. Reading begins
at `start` and ends after `end`. (The line that contains the end
position will be the last read. Segment constructor guarantees that
segment will always end at a line boundary, but manually created
segments may not.)"
([{:keys [file start end]}]
(let [file-input-stream (FileInputStream. ^File file)
_ (.position (.getChannel file-input-stream) ^int start)
reader (io/reader file-input-stream)]
(file-segment-line-seq reader 0 (- end start))))
([^BufferedReader reader bytes-read bytes-max]
(when (not (>= bytes-read bytes-max))
(when-let [line (.readLine reader)]
(let [line-length-bytes (inc (count (.getBytes line)))]
(cons line (lazy-seq (file-segment-line-seq reader
(+ bytes-read line-length-bytes)
bytes-max))))))))
(defn merge-station-stats
"Returns two station stats maps merged."
[{cnt1 :count min1 :minimum max1 :maximum avg1 :average}
{cnt2 :count min2 :minimum max2 :maximum avg2 :average}]
{:count (+ cnt1 cnt2)
:minimum (min min1 min2)
:maximum (max max1 max2)
:average (/ (+ (* cnt1 avg1) (* cnt2 avg2)) (+ cnt1 cnt2))})
(defn parallel-stats
"Returns the map of station stats keyed by station names. Reads the
measurements data from `filename`, that is a text file containing
semicolon-separated values. Uses multiple threads to read the
file (divided to `n` logical segments) and computes stats for each
segment, then combines the results. Default number of segments is the
number of available cores + 2, same as the number of threads used by
`pmap`."
([filename]
(parallel-stats filename (+ 2 (.. Runtime getRuntime availableProcessors))))
([filename n]
(let [file (io/file filename)]
(reduce #(merge-with merge-station-stats %1 %2)
(pmap (fn [segment]
(transducing-stats (file-segment-line-seq segment)))
(file-segment-seq file n))))))
(comment
;; 80% better than baseline (4.5 min vs 22 min), with 4 physical (8 logical) cores (Ryzen 5 3400G)
;; 1e6 lines: 0.3 sec
(time (format-stats (parallel-stats "dev/resources/measurements1M.txt")))
;; 1e9 lines: 263 sec (~4.5 min)
(time (format-stats (parallel-stats "dev/resources/measurements.txt")))
)
Исправление Nervous, :
Надо будет допилить да off-by-one ошибки повыловить
;; using multiple threads to read the file (divided to logical segments) and
;; computing stats for each segment, then combining the results
(def ^:dynamic *end-of-file*
"Integer value returned by a file input stream when end of file is reached." -1)
(def ^:dynamic *end-of-line*
"Integer value that corresponds to the end of line character." 10)
(defn file-segment
"Returns a logical segment of `file`, defined by start and end
positions in bytes. The segment will begin at `start` and end after
newline nearest to `start` + `initial-length` position (inclusive),
i.e., segment always ends between lines (or at the end of file);
`start` defaults to 0; `initial-length` defaults to the length of
the entire file."
([^File file]
(file-segment file 0))
([^File file start]
(file-segment file start (.length file)))
([^File file start initial-length]
(when (pos? initial-length)
(with-open [file-input-stream (FileInputStream. file)]
(let [file-channel (.getChannel file-input-stream)
end-of-line-or-file? #{*end-of-file* *end-of-line*}
at-initial-end (+ start initial-length)
before-initial-end (dec at-initial-end)]
{:file file
:start start
:end (do
(.position file-channel ^int before-initial-end)
(let [initial-end-byte (.read file-input-stream)]
(if (end-of-line-or-file? initial-end-byte)
(min (.position file-channel) (.length file))
(do
(.position file-channel ^int at-initial-end)
(while (not (end-of-line-or-file? (.read file-input-stream))))
(min (.position file-channel) (.length file))))))})))))
(defn file-segment-seq
"Returns a lazy sequence of at most `n` successive logical segments of
`file`. (As segments may be expanded to align with line boundaries,
there may be fewer of them, especially if segments are small and
lines are long.)"
([^java.io.File file n]
(file-segment-seq file 0 n))
([^java.io.File file start n]
(when (> n 0)
(let [remaining-file-length (- (.length file) start)
initial-segment-length (int (/ remaining-file-length n))]
(when-let [segment (file-segment file start initial-segment-length)]
(cons segment (lazy-seq (file-segment-seq file (:end segment) (dec n)))))))))
(defn file-segment-line-seq
"Returns a lazy sequence of lines from the `segment`. Reading begins
at `start` and ends after `end`. (The line that contains the end
position will be the last read. Segment constructor guarantees that
segment will always end at a line boundary, but manually created
segments may not.)"
([{:keys [file start end]}]
(let [file-input-stream (FileInputStream. ^File file)
_ (.position (.getChannel file-input-stream) ^int start)
reader (io/reader file-input-stream)]
(file-segment-line-seq reader 0 (- end start))))
([^BufferedReader reader bytes-read bytes-max]
(when (not (>= bytes-read bytes-max))
(when-let [line (.readLine reader)]
(let [line-length-bytes (inc (count (.getBytes line)))]
(cons line (lazy-seq (file-segment-line-seq reader
(+ bytes-read line-length-bytes)
bytes-max))))))))
(defn merge-station-stats
"Returns two station stats maps merged."
[{cnt1 :count min1 :minimum max1 :maximum avg1 :average}
{cnt2 :count min2 :minimum max2 :maximum avg2 :average}]
{:count (+ cnt1 cnt2)
:minimum (min min1 min2)
:maximum (max max1 max2)
:average (/ (+ (* cnt1 avg1) (* cnt2 avg2)) (+ cnt1 cnt2))})
(defn parallel-stats
"Returns the map of station stats keyed by station names. Reads the
measurements data from `filename`, that is a text file containing
semicolon-separated values. Uses multiple threads to read the
file (divided to `n` logical segments) and computes stats for each
segment, then combines the results. Default number of segments is the
number of available cores + 2, same as the number of threads used by
`pmap`."
([filename]
(parallel-stats filename (+ 2 (.. Runtime getRuntime availableProcessors))))
([filename n]
(let [file (io/file filename)]
(reduce #(merge-with merge-station-stats %1 %2)
(pmap (fn [segment]
(transducing-stats (file-segment-line-seq segment)))
(file-segment-seq file n))))))
(comment
;; 80% better than baseline (4.5 min vs 22 min), with 4 physical (8 logical) cores (Ryzen 5 3400G)
;; 1e6 lines: 0.3 sec
(time (format-stats (parallel-stats "dev/resources/measurements1M.txt")))
;; 1e9 lines: 263 sec (~4.5 min)
(time (format-stats (parallel-stats "dev/resources/measurements.txt")))
)
Исправление Nervous, :
Надо будет допилить да off-by-one ошибки повыловить
;; using multiple threads to read the file (divided to logical segments) and
;; computing stats for each segment, then combining the results
(def ^:dynamic *end-of-file*
"Integer value returned by a file input stream when end of file is reached." -1)
(def ^:dynamic *end-of-line*
"Integer value that corresponds to the end of line character." 10)
(defn file-segment
"Returns a logical segment of `file`, defined by start and end
positions in bytes. The segment will begin at `start` and end after
newline nearest to `start` + `initial-length` position (inclusive),
i.e., segment always ends between lines (or at the end of file);
`start` defaults to 0; `initial-length` defaults to the length of
the entire file."
([^File file]
(file-segment file 0))
([^File file start]
(file-segment file start (.length file)))
([^File file start initial-length]
(when (pos? initial-length)
(with-open [file-input-stream (FileInputStream. file)]
(let [file-channel (.getChannel file-input-stream)
end-of-line-or-file? #{*end-of-file* *end-of-line*}
at-initial-end (+ start initial-length)
before-initial-end (dec at-initial-end)]
{:file file
:start start
:end (do
(.position file-channel ^int before-initial-end)
(let [initial-end-byte (.read file-input-stream)]
(if (end-of-line-or-file? initial-end-byte)
(min (.position file-channel) (.length file))
(do
(.position file-channel ^int at-initial-end)
(while (not (end-of-line-or-file? (.read file-input-stream))))
(min (.position file-channel) (.length file))))))})))))
(defn file-segment-seq
"Returns a lazy sequence of at most `n` successive logical segments of
`file`. (As segments may be expanded to align with line boundaries,
there may be fewer of them, especially if segments are small and
lines are long.)"
([^java.io.File file n]
(file-segment-seq file 0 n))
([^java.io.File file start n]
(when (> n 0)
(let [remaining-file-length (- (.length file) start)
initial-segment-length (int (/ remaining-file-length n))]
(when-let [segment (file-segment file start initial-segment-length)]
(cons segment (lazy-seq (file-segment-seq file (:end segment) (dec n)))))))))
(defn file-segment-line-seq
"Returns a lazy sequence of lines from the `segment`. Reading begins
at `start` and ends after `end`. (The line that contains the end
position will be the last read. Segment constructor guarantees that
segment will always end at a line boundary, but manually created
segments may not.)"
([{:keys [file start end]}]
(let [file-input-stream (FileInputStream. ^File file)
_ (.position (.getChannel file-input-stream) ^int start)
reader (io/reader file-input-stream)]
(file-segment-line-seq reader 0 (- end start))))
([^BufferedReader reader bytes-read bytes-max]
(when (not (>= bytes-read bytes-max))
(when-let [line (.readLine reader)]
(let [line-length-bytes (inc (count (.getBytes line)))]
(cons line (lazy-seq (file-segment-line-seq reader
(+ bytes-read line-length-bytes)
bytes-max))))))))
(defn merge-station-stats
"Returns two station stats maps merged."
[{cnt1 :count min1 :minimum max1 :maximum avg1 :average}
{cnt2 :count min2 :minimum max2 :maximum avg2 :average}]
{:count (+ cnt1 cnt2)
:minimum (min min1 min2)
:maximum (max max1 max2)
:average (/ (+ (* cnt1 avg1) (* cnt2 avg2)) (+ cnt1 cnt2))})
(defn parallel-stats
"Returns the map of station stats keyed by station names. Reads the
measurements data from `filename`, that is a text file containing
semicolon-separated values. Uses multiple threads to read the
file (divided to `n` logical segments) and computes stats for each
segment, then combines the results. Default number of segments is the
number of available cores + 2, same as the number of threads used by
`pmap`."
([filename]
(parallel-stats filename (+ 2 (.. Runtime getRuntime availableProcessors))))
([filename n]
(let [file (io/file filename)]
(reduce #(merge-with merge-station-stats %1 %2)
(pmap (fn [segment]
(transducing-stats (file-segment-line-seq segment)))
(file-segment-seq file n))))))
(comment
;; 80% better than baseline (4.5 min vs 22 min), with 4 physical (8 logical) cores (Ryzen 5 3400G)
;; 1e6 lines: 0.3 sec
(time (format-stats (parallel-stats "dev/resources/measurements1M.txt")))
;; 1e9 lines: 263 sec (~4.5 min)
(time (format-stats (parallel-stats "dev/resources/measurements.txt")))
)
Исправление Nervous, :
Надо будет допилить да off-by-one ошибки повыловить
;; using multiple threads to read the file (divided to logical segments) and
;; computing stats for each segment, then combining the results
(def ^:dynamic *end-of-file*
"Integer value returned by a file input stream when end of file is reached." -1)
(def ^:dynamic *end-of-line*
"Integer value that corresponds to the end of line character." 10)
(defn file-segment
"Returns a logical segment of `file`, defined by start and end
positions in bytes. The segment will begin at `start` and end after
newline nearest to `start` + `initial-length` position (inclusive),
i.e., segment always ends between lines (or at the end of file);
`start` defaults to 0; `initial-length` defaults to the length of
the entire file."
([^File file]
(file-segment file 0))
([^File file start]
(file-segment file start (.length file)))
([^File file start initial-length]
(when (pos? initial-length)
(with-open [file-input-stream (FileInputStream. file)]
(let [file-channel (.getChannel file-input-stream)
end-of-line-or-file? #{*end-of-file* *end-of-line*}
at-initial-end (+ start initial-length)
before-initial-end (dec at-initial-end)]
{:file file
:start start
:end (do
(.position file-channel ^int before-initial-end)
(let [initial-end-byte (.read file-input-stream)]
(if (end-of-line-or-file? initial-end-byte)
(min (.position file-channel) (.length file))
(do
(.position file-channel ^int at-initial-end)
(while (not (end-of-line-or-file? (.read file-input-stream))))
(min (.position file-channel) (.length file))))))})))))
(defn file-segment-seq
"Returns a lazy sequence of at most `n` successive logical segments of
`file`. (As segments may be expanded to align with line boundaries,
there may be fewer of them, especially if segments are small and
lines are long.)"
([^java.io.File file n]
(file-segment-seq file 0 n))
([^java.io.File file start n]
(when (> n 0)
(let [remaining-file-length (- (.length file) start)
initial-segment-length (int (/ remaining-file-length n))]
(when-let [segment (file-segment file start initial-segment-length)]
(cons segment (lazy-seq (file-segment-seq file (:end segment) (dec n)))))))))
(defn file-segment-line-seq
"Returns a lazy sequence of lines from the `segment`. Reading begins
at `start` and ends after `end`. (The line that contains the end
position will be the last read. Segment constructor guarantees that
segment will always end at a line boundary, but manually created
segments may not.)"
([{:keys [file start end]}]
(let [file-input-stream (FileInputStream. ^File file)
_ (.position (.getChannel file-input-stream) ^int start)
reader (io/reader file-input-stream)]
(file-segment-line-seq reader 0 (- end start))))
([^BufferedReader reader bytes-read bytes-max]
(when (not (>= bytes-read bytes-max))
(when-let [line (.readLine reader)]
(let [line-length-bytes (inc (count (.getBytes line)))]
(cons line (lazy-seq (file-segment-line-seq reader
(+ bytes-read line-length-bytes)
bytes-max))))))))
(defn merge-station-stats
"Returns two station stats maps merged."
[{cnt1 :count min1 :minimum max1 :maximum avg1 :average}
{cnt2 :count min2 :minimum max2 :maximum avg2 :average}]
{:count (+ cnt1 cnt2)
:minimum (min min1 min2)
:maximum (max max1 max2)
:average (/ (+ (* cnt1 avg1) (* cnt2 avg2)) (+ cnt1 cnt2))})
(defn parallel-stats
"Returns the map of station stats keyed by station names. Reads the
measurements data from `filename`, that is a text file containing
semicolon-separated values. Uses multiple threads to read the
file (divided to `n` logical segments) and computes stats for each
segment, then combines the results. Default number of segments is the
number of available cores + 2, same as the number of threads used by
`pmap`."
([filename]
(parallel-stats filename (+ 2 (.. Runtime getRuntime availableProcessors))))
([filename n]
(let [file (io/file filename)]
(reduce #(merge-with merge-station-stats %1 %2)
(pmap (fn [segment]
(transducing-stats (file-segment-line-seq segment)))
(file-segment-seq file n))))))
(comment
;; 80% better than baseline (4.5 min vs 22 min), with 4 physical (8 logical) cores (Ryzen 5 3400G)
;; 0.3 sec
(time (format-stats (parallel-stats "dev/resources/measurements1M.txt")))
;; 263 sec (~4.5 min)
(time (format-stats (parallel-stats "dev/resources/measurements.txt")))
)
Исправление Nervous, :
Надо будет допилить да off-by-one ошибки повыловить
;; using multiple threads to read the file (divided to logical segments) and
;; computing stats for each segment, then combining the results
(def ^:dynamic *end-of-file*
"Integer value returned by a file input stream when end of file is reached." -1)
(def ^:dynamic *end-of-line*
"Integer value that corresponds to the end of line character." 10)
(defn file-segment
"Returns a logical segment of `file`, defined by start and end
positions in bytes. The segment will begin at `start` and end after
newline nearest to `start` + `initial-length` position (inclusive),
i.e., segment always ends between lines (or at the end of file);
`start` defaults to 0; `initial-length` defaults to the length of
the entire file."
([^File file]
(file-segment file 0))
([^File file start]
(file-segment file start (.length file)))
([^File file start initial-length]
(when (pos? initial-length)
(with-open [file-input-stream (FileInputStream. file)]
(let [file-channel (.getChannel file-input-stream)
end-of-line-or-file? #{*end-of-file* *end-of-line*}
at-initial-end (+ start initial-length)
before-initial-end (dec at-initial-end)]
{:file file
:start start
:end (do
(.position file-channel ^int before-initial-end)
(let [initial-end-byte (.read file-input-stream)]
(if (end-of-line-or-file? initial-end-byte)
(min (.position file-channel) (.length file))
(do
(.position file-channel ^int at-initial-end)
(while (not (end-of-line-or-file? (.read file-input-stream))))
(min (.position file-channel) (.length file))))))})))))
(defn file-segment-seq
"Returns a lazy sequence of at most `n` successive logical segments of
`file`. (As segments may be expanded to align with line boundaries,
there may be fewer of them, especially if segments are small and
lines are long.)"
([^java.io.File file n]
(file-segment-seq file 0 n))
([^java.io.File file start n]
(when (> n 0)
(let [remaining-file-length (- (.length file) start)
initial-segment-length (int (/ remaining-file-length n))]
(when-let [segment (file-segment file start initial-segment-length)]
(cons segment (lazy-seq (file-segment-seq file (:end segment) (dec n)))))))))
(defn file-segment-line-seq
"Returns a lazy sequence of lines from the `segment`. Reading begins
at `start` and ends after `end`. (The line that contains the end
position will be the last read. Segment constructor guarantees that
segment will always end at a line boundary, but manually created
segments may not.)"
([{:keys [file start end]}]
(let [file-input-stream (FileInputStream. ^File file)
_ (.position (.getChannel file-input-stream) ^int start)
reader (io/reader file-input-stream)]
(file-segment-line-seq reader 0 (- end start))))
([^BufferedReader reader bytes-read bytes-max]
(when (not (>= bytes-read bytes-max))
(when-let [line (.readLine reader)]
(let [line-length-bytes (inc (count (.getBytes line)))]
(cons line (lazy-seq (file-segment-line-seq reader
(+ bytes-read line-length-bytes)
bytes-max))))))))
(defn merge-station-stats
"Returns two station stats maps merged."
[{cnt1 :count min1 :minimum max1 :maximum avg1 :average}
{cnt2 :count min2 :minimum max2 :maximum avg2 :average}]
{:count (+ cnt1 cnt2)
:minimum (min min1 min2)
:maximum (max max1 max2)
:average (/ (+ (* cnt1 avg1) (* cnt2 avg2)) (+ cnt1 cnt2))})
(defn parallel-stats
"Returns the map of station stats keyed by station names. Reads the
measurements data from `filename`, that is a text file containing
semicolon-separated values. Uses multiple threads to read the
file (divided to `n` logical segments) and computes stats for each
segment, then combines the results. Default number of segments is the
number of available cores + 2, same as the number of threads used by
`pmap`."
([filename]
(parallel-stats filename (+ 2 (.. Runtime getRuntime availableProcessors))))
([filename n]
(let [file (io/file filename)]
(reduce #(merge-with merge-station-stats %1 %2)
(pmap (fn [segment]
(transducing-stats (file-segment-line-seq segment)))
(file-segment-seq file n))))))
(comment
;; 80% better than baseline (4.5 min vs 22 min)
;; 0.3 sec
(time (format-stats (parallel-stats "dev/resources/measurements1M.txt")))
;; 263 sec (~4.5 min)
(time (format-stats (parallel-stats "dev/resources/measurements.txt")))
)
Исправление Nervous, :
Надо будет допилить да off-by-one ошибки повыловить
;; using multiple threads to read the file (divided to logical segments) and
;; computing stats for each segment, then combining the results
(def ^:dynamic *end-of-file*
"Integer value returned by a file input stream when end of file is reached." -1)
(def ^:dynamic *end-of-line*
"Integer value that corresponds to the end of line character." 10)
(defn file-segment
"Returns a logical segment of `file`, defined by start and end
positions in bytes. The segment will begin at `start` and end after
newline nearest to `start` + `initial-length` position (inclusive),
i.e., segment always ends between lines (or at the end of file);
`start` defaults to 0; `initial-length` defaults to the length of
the entire file."
([^File file]
(file-segment file 0))
([^File file start]
(file-segment file start (.length file)))
([^File file start initial-length]
(when (pos? initial-length)
(with-open [file-input-stream (FileInputStream. file)]
(let [file-channel (.getChannel file-input-stream)
end-of-line-or-file? #{*end-of-file* *end-of-line*}
at-initial-end (+ start initial-length)
before-initial-end (dec at-initial-end)]
{:file file
:start start
:end (do
(.position file-channel ^int before-initial-end)
(let [initial-end-byte (.read file-input-stream)]
(if (end-of-line-or-file? initial-end-byte)
(min (.position file-channel) (.length file))
(do
(.position file-channel ^int at-initial-end)
(while (not (end-of-line-or-file? (.read file-input-stream))))
(min (.position file-channel) (.length file))))))})))))
(defn file-segment-seq
"Returns a lazy sequence of at most `n` successive logical segments of
`file`. (As segments may be expanded to align with line boundaries,
there may be fewer of them, especially if segments are small and
lines are long.)"
([^java.io.File file n]
(file-segment-seq file 0 n))
([^java.io.File file start n]
(when (> n 0)
(let [remaining-file-length (- (.length file) start)
initial-segment-length (int (/ remaining-file-length n))]
(when-let [segment (file-segment file start initial-segment-length)]
(cons segment (lazy-seq (file-segment-seq file (:end segment) (dec n)))))))))
(defn file-segment-line-seq
"Returns a lazy sequence of lines from the `segment`. Reading begins
at `start` and ends after `end`. (The line that contains the end
position will be the last read. Segment constructor guarantees that
segment will always end at a line boundary, but manually created
segments may not.)"
([{:keys [file start end]}]
(let [file-input-stream (FileInputStream. ^File file)
_ (.position (.getChannel file-input-stream) ^int start)
reader (io/reader file-input-stream)]
(file-segment-line-seq reader 0 (- end start))))
([^BufferedReader reader bytes-read bytes-max]
(when (not (>= bytes-read bytes-max))
(when-let [line (.readLine reader)]
(let [line-length-bytes (inc (count (.getBytes line)))]
(cons line (lazy-seq (file-segment-line-seq reader
(+ bytes-read line-length-bytes)
bytes-max))))))))
(defn combine-station-stats
"Returns two station stats maps merged."
[{cnt1 :count min1 :minimum max1 :maximum avg1 :average}
{cnt2 :count min2 :minimum max2 :maximum avg2 :average}]
{:count (+ cnt1 cnt2)
:minimum (min min1 min2)
:maximum (max max1 max2)
:average (/ (+ (* cnt1 avg1) (* cnt2 avg2)) (+ cnt1 cnt2))})
(defn parallel-stats
"Returns the map of station stats keyed by station names. Reads the
measurements data from `filename`, that is a text file containing
semicolon-separated values. Uses multiple threads to read the
file (divided to `n` logical segments) and computes stats for each
segment, then combinesthe results. Default number of segments is the
number of available cores + 2, same as the number of threads used by
`pmap`."
([filename]
(parallel-stats filename (+ 2 (.. Runtime getRuntime availableProcessors))))
([filename n]
(let [file (io/file filename)]
(reduce #(merge-with combine-station-stats %1 %2)
(pmap (fn [segment]
(transducing-stats (file-segment-line-seq segment)))
(file-segment-seq file n))))))
(comment
;; 80% better than baseline (4.5 min vs 22 min)
;; 0.3 sec
(time (format-stats (parallel-stats "dev/resources/measurements1M.txt")))
;; 263 sec (~4.5 min)
(time (format-stats (parallel-stats "dev/resources/measurements.txt")))
)
Исправление Nervous, :
Надо будет допилить да off-by-one ошибки повыловить
;; using multiple threads to read the file (divided to logical segments) and
;; computing stats for each segment, then combining the results
(def ^:dynamic *end-of-file*
"Integer value returned by a file input stream when end of file is reached." -1)
(def ^:dynamic *end-of-line*
"Integer value that corresponds to the end of line character." 10)
(defn file-segment
"Returns a logical segment of `file`, defined by start and end
positions in bytes. The segment will begin at `start` and end after
newline nearest to `start` + `initial-length` position (inclusive),
i.e., segment always ends between lines (or at the end of file);
`start` defaults to 0; `initial-length` defaults to the length of
the entire file."
([^File file]
(file-segment file 0))
([^File file start]
(file-segment file start (.length file)))
([^File file start initial-length]
(when (pos? initial-length)
(with-open [file-input-stream (FileInputStream. file)]
(let [file-channel (.getChannel file-input-stream)
end-of-line-or-file? #{*end-of-file* *end-of-line*}
at-initial-end (+ start initial-length)
before-initial-end (dec at-initial-end)]
{:file file
:start start
:end (do
(.position file-channel ^int before-initial-end)
(let [initial-end-byte (.read file-input-stream)]
(if (end-of-line-or-file? initial-end-byte)
(min (.position file-channel) (.length file))
(do
(.position file-channel ^int at-initial-end)
(while (not (end-of-line-or-file? (.read file-input-stream))))
(min (.position file-channel) (.length file))))))})))))
(defn file-segment-seq
"Returns a lazy sequence of at most `n` successive logical segments of
`file`. (As segments may be expanded to align with line boundaries,
there may be fewer of them, especially if segments are small and
lines are long.)"
([^java.io.File file n]
(file-segment-seq file 0 n))
([^java.io.File file start n]
(when (> n 0)
(let [remaining-file-length (- (.length file) start)
initial-segment-length (int (/ remaining-file-length n))]
(when-let [segment (file-segment file start initial-segment-length)]
(cons segment (lazy-seq (file-segment-seq file (:end segment) (dec n)))))))))
(defn file-segment-line-seq
"Returns a lazy sequence of lines from the `segment`. Reading begins
at `start` and ends after `end`. (The line that contains the end
position will be the last read. Segment constructor guarantees that
segment will always end at a line boundary, but manually created
segments may not.)"
([{:keys [file start end]}]
(let [file-input-stream (FileInputStream. ^File file)
_ (.position (.getChannel file-input-stream) ^int start)
reader (io/reader file-input-stream)]
(file-segment-line-seq reader 0 (- end start))))
([^BufferedReader reader bytes-read bytes-max]
(when (not (>= bytes-read bytes-max))
(when-let [line (.readLine reader)]
(let [line-length-bytes (inc (count (.getBytes line)))]
(cons line (lazy-seq (file-segment-line-seq reader
(+ bytes-read line-length-bytes)
bytes-max))))))))
(defn combine-station-stats
"Returns two station stats maps merged."
[{cnt1 :count min1 :minimum max1 :maximum avg1 :average}
{cnt2 :count min2 :minimum max2 :maximum avg2 :average}]
{:count (+ cnt1 cnt2)
:minimum (min min1 min2)
:maximum (max max1 max2)
:average (/ (+ (* cnt1 avg1) (* cnt2 avg2)) (+ cnt1 cnt2))})
(defn parallel-stats
"Returns the map of station stats keyed by station names. Reads the
measurements data from `filename`, that is a text file containing
semicolon-separated values. Uses multiple threads to read the
file (divided to `n` logical segments) and computes stats for each
segment, then combinesthe results. Default number of segments is the
number of available cores + 2, same as the number of threads used by
`pmap`."
([filename]
(parallel-stats filename (+ 2 (.. Runtime getRuntime availableProcessors))))
([filename n]
(let [file (io/file filename)]
(reduce #(merge-with combine-station-stats %1 %2)
(pmap (fn [segment]
(transducing-stats (file-segment-line-seq segment)))
(file-segment-seq file n))))))
(comment
;; 80% better than baseline (4.5 min vs 22 min)
;; 0.3 sec
(time (format-stats (parallel-stats "dev/resources/measurements1M.txt")))
;; 263 sec (~4.5 min)
(time (format-stats (parallel-stats "dev/resources/measurements.txt")))
)
Исправление Nervous, :
Надо будет допилить да off-by-one ошибки повыловить
;; using multiple threads to read the file (divided to logical segments) and
;; computing stats for each segment, then combining the results
(def ^:dynamic *end-of-file*
"Integer value returned by a file input stream when end of file is reached." -1)
(def ^:dynamic *end-of-line*
"Integer value that corresponds to the end of line character." 10)
(defn file-segment
"Returns a logical segment of `file`, defined by start and end
positions in bytes. The segment will begin at `start` and end after
newline nearest to `start` + `initial-length` position (inclusive),
i.e., segment always ends between lines (or at the end of file);
`start` defaults to 0; `initial-length` defaults to the length of
the entire file."
([^File file]
(file-segment file 0))
([^File file start]
(file-segment file start (.length file)))
([^File file start initial-length]
(when (> initial-length 0)
(with-open [file-input-stream (FileInputStream. file)]
(let [file-channel (.getChannel file-input-stream)
end-of-line-or-file? #{*end-of-file* *end-of-line*}
at-initial-end (+ start initial-length)
before-initial-end (dec at-initial-end)]
{:file file
:start start
:end (do
(.position file-channel ^int before-initial-end)
(let [initial-end-byte (.read file-input-stream)]
(if (end-of-line-or-file? initial-end-byte)
(min (.position file-channel) (.length file))
(do
(.position file-channel ^int at-initial-end)
(while (not (end-of-line-or-file? (.read file-input-stream))))
(min (.position file-channel) (.length file))))))})))))
(defn file-segment-seq
"Returns a lazy sequence of at most `n` successive logical segments of
`file`. (As segments may be expanded to align with line boundaries,
there may be fewer of them, especially if segments are small and
lines are long.)"
([^java.io.File file n]
(file-segment-seq file 0 n))
([^java.io.File file start n]
(when (> n 0)
(let [remaining-file-length (- (.length file) start)
initial-segment-length (int (/ remaining-file-length n))]
(when-let [segment (file-segment file start initial-segment-length)]
(cons segment (lazy-seq (file-segment-seq file (:end segment) (dec n)))))))))
(defn file-segment-line-seq
"Returns a lazy sequence of lines from the `segment`. Reading begins
at `start` and ends after `end`. (The line that contains the end
position will be the last read. Segment constructor guarantees that
segment will always end at a line boundary, but manually created
segments may not.)"
([{:keys [file start end]}]
(let [file-input-stream (FileInputStream. ^File file)
_ (.position (.getChannel file-input-stream) ^int start)
reader (io/reader file-input-stream)]
(file-segment-line-seq reader 0 (- end start))))
([^BufferedReader reader bytes-read bytes-max]
(when (not (>= bytes-read bytes-max))
(when-let [line (.readLine reader)]
(let [line-length-bytes (inc (count (.getBytes line)))]
(cons line (lazy-seq (file-segment-line-seq reader
(+ bytes-read line-length-bytes)
bytes-max))))))))
(defn combine-station-stats
"Returns two station stats maps merged."
[{cnt1 :count min1 :minimum max1 :maximum avg1 :average}
{cnt2 :count min2 :minimum max2 :maximum avg2 :average}]
{:count (+ cnt1 cnt2)
:minimum (min min1 min2)
:maximum (max max1 max2)
:average (/ (+ (* cnt1 avg1) (* cnt2 avg2)) (+ cnt1 cnt2))})
(defn parallel-stats
"Returns the map of station stats keyed by station names. Reads the
measurements data from `filename`, that is a text file containing
semicolon-separated values. Uses multiple threads to read the
file (divided to `n` logical segments) and computes stats for each
segment, then combinesthe results. Default number of segments is the
number of available cores + 2, same as the number of threads used by
`pmap`."
([filename]
(parallel-stats filename (+ 2 (.. Runtime getRuntime availableProcessors))))
([filename n]
(let [file (io/file filename)]
(reduce #(merge-with combine-station-stats %1 %2)
(pmap (fn [segment]
(transducing-stats (file-segment-line-seq segment)))
(file-segment-seq file n))))))
(comment
;; 80% better than baseline (4.5 min vs 22 min)
;; 0.3 sec
(time (format-stats (parallel-stats "dev/resources/measurements1M.txt")))
;; 263 sec (~4.5 min)
(time (format-stats (parallel-stats "dev/resources/measurements.txt")))
)
Исходная версия Nervous, :
Надо будет допилить да off-by-one ошибки повыловить
;; using multiple threads to read the file (divided to logical segments) and
;; compute stats for each segment, then combining the results
(def ^:dynamic *end-of-file*
"Integer value returned by a file input stream when end of file is reached." -1)
(def ^:dynamic *end-of-line*
"Integer value that corresponds to the end of line character." 10)
(defn file-segment
"Returns a logical segment of `file`, defined by start and end
positions in bytes. The segment will begin at `start` and end after
newline nearest to `start` + `initial-length` position (inclusive),
i.e., segment always ends between lines (or at the end of file);
`start` defaults to 0; `initial-length` defaults to the length of
the entire file."
([^File file]
(file-segment file 0))
([^File file start]
(file-segment file start (.length file)))
([^File file start initial-length]
(when (> initial-length 0)
(with-open [file-input-stream (FileInputStream. file)]
(let [file-channel (.getChannel file-input-stream)
end-of-line-or-file? #{*end-of-file* *end-of-line*}
at-initial-end (+ start initial-length)
before-initial-end (dec at-initial-end)]
{:file file
:start start
:end (do
(.position file-channel ^int before-initial-end)
(let [initial-end-byte (.read file-input-stream)]
(if (end-of-line-or-file? initial-end-byte)
(min (.position file-channel) (.length file))
(do
(.position file-channel ^int at-initial-end)
(while (not (end-of-line-or-file? (.read file-input-stream))))
(min (.position file-channel) (.length file))))))})))))
(defn file-segment-seq
"Returns a lazy sequence of at most `n` successive logical segments of
`file`. (As segments may be expanded to align with line boundaries,
there may be fewer of them, especially if segments are small and
lines are long.)"
([^java.io.File file n]
(file-segment-seq file 0 n))
([^java.io.File file start n]
(when (> n 0)
(let [remaining-file-length (- (.length file) start)
initial-segment-length (int (/ remaining-file-length n))]
(when-let [segment (file-segment file start initial-segment-length)]
(cons segment (lazy-seq (file-segment-seq file (:end segment) (dec n)))))))))
(defn file-segment-line-seq
"Returns a lazy sequence of lines from the `segment`. Reading begins
at `start` and ends after `end`. (The line that contains the end
position will be the last read. Segment constructor guarantees that
segment will always end at a line boundary, but manually created
segments may not.)"
([{:keys [file start end]}]
(let [file-input-stream (FileInputStream. ^File file)
_ (.position (.getChannel file-input-stream) ^int start)
reader (io/reader file-input-stream)]
(file-segment-line-seq reader 0 (- end start))))
([^BufferedReader reader bytes-read bytes-max]
(when (not (>= bytes-read bytes-max))
(when-let [line (.readLine reader)]
(let [line-length-bytes (inc (count (.getBytes line)))]
(cons line (lazy-seq (file-segment-line-seq reader
(+ bytes-read line-length-bytes)
bytes-max))))))))
(defn combine-station-stats
"Returns two station stats maps merged."
[{cnt1 :count min1 :minimum max1 :maximum avg1 :average}
{cnt2 :count min2 :minimum max2 :maximum avg2 :average}]
{:count (+ cnt1 cnt2)
:minimum (min min1 min2)
:maximum (max max1 max2)
:average (/ (+ (* cnt1 avg1) (* cnt2 avg2)) (+ cnt1 cnt2))})
(defn parallel-stats
"Returns the map of station stats keyed by station names. Reads the
measurements data from `filename`, that is a text file containing
semicolon-separated values. Uses multiple threads to read the
file (divided to `n` logical segments) and computes stats for each
segment, then combinesthe results. Default number of segments is the
number of available cores + 2, same as the number of threads used by
`pmap`."
([filename]
(parallel-stats filename (+ 2 (.. Runtime getRuntime availableProcessors))))
([filename n]
(let [file (io/file filename)]
(reduce #(merge-with combine-station-stats %1 %2)
(pmap (fn [segment]
(transducing-stats (file-segment-line-seq segment)))
(file-segment-seq file n))))))
(comment
;; 80% better than baseline (4.5 min vs 22 min)
;; 0.3 sec
(time (format-stats (parallel-stats "dev/resources/measurements1M.txt")))
;; 263 sec (~4.5 min)
(time (format-stats (parallel-stats "dev/resources/measurements.txt")))
)