Reputation: 191
I have written a Bytestring parser using the Attoparsec library:
import qualified Data.ByteString.Char8 as B
import qualified Data.Attoparsec.ByteString.Char8 as P
parseComplex :: P.Parser Complex
My intention was to use this parse large (> 5 Gb) files so the implementation utilized this parser lazily:
import qualified Data.ByteString.Lazy.Char8 as LB
import qualified Data.Attoparsec.ByteString.Lazy as LP
extr :: LP.Result a -> a
main = do
rawData <- liftA LB.words (LB.readFile "/mnt/hgfs/outputs/out.txt")
let formatedData = map (extr.LP.parse parseComplex) rawData
...
Executing this on a test file with -O2
and -s
flags, I see:
3,509,019,048 bytes allocated in the heap
2,086,240 bytes copied during GC
58,256 bytes maximum residency (30 sample(s))
126,240 bytes maximum slop
2 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 6737 colls, 0 par 0.03s 0.03s 0.0000s 0.0001s
Gen 1 30 colls, 0 par 0.00s 0.00s 0.0001s 0.0002s
INIT time 0.00s ( 0.00s elapsed)
MUT time 0.83s ( 0.83s elapsed)
GC time 0.04s ( 0.04s elapsed)
EXIT time 0.00s ( 0.00s elapsed)
Total time 0.87s ( 0.86s elapsed)
%GC time 4.3% (4.3% elapsed)
Alloc rate 4,251,154,493 bytes per MUT second
Productivity 95.6% of total user, 95.8% of total elapsed
Since I am mapping a function over a list independently, I thought this code would maybe benefit from parallelization. I had never done anything of the sort before in Haskell but messing around with the Control.Monad.Par
library, I wrote a simple, naive, static paritioning function that I thought would map my parsing in parallel:
import Control.Monad.Par
parseMap :: [LB.ByteString] -> [Complex]
parseMap x = runPar $ do
let (as, bs) = force $ splitAt (length x `div` 2) x
a <- spawnP $ map (extr.LP.parse parseComplex) as
b <- spawnP $ map (extr.LP.parse parseComplex) bs
c <- get a
d <- get b
return $ c ++ d
I was not expecting too much from this function, however performance with parallel was much worse that the sequential computation. Here is the main function and the results, compiled with -O2 -threaded -rtsopts
and executed with +RTS -s -N2
:
main = do
rawData <- liftA LB.words (LB.readFile "/mnt/hgfs/outputs/out.txt")
let formatedData = parseMap rawData
...
3,641,068,984 bytes allocated in the heap
356,490,472 bytes copied during GC
82,325,144 bytes maximum residency (10 sample(s))
14,182,712 bytes maximum slop
253 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 4704 colls, 4704 par 0.50s 0.25s 0.0001s 0.0006s
Gen 1 10 colls, 9 par 0.57s 0.29s 0.0295s 0.1064s
Parallel GC work balance: 19.77% (serial 0%, perfect 100%)
TASKS: 4 (1 bound, 3 peak workers (3 total), using -N2)
SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
INIT time 0.00s ( 0.00s elapsed)
MUT time 1.11s ( 0.72s elapsed)
GC time 1.07s ( 0.54s elapsed)
EXIT time 0.02s ( 0.02s elapsed)
Total time 2.20s ( 1.28s elapsed)
Alloc rate 3,278,811,516 bytes per MUT second
Productivity 51.2% of total user, 88.4% of total elapsed
gc_alloc_block_sync: 149514
whitehole_spin: 0
gen[0].sync: 0
gen[1].sync: 32
As you can see, there seems to be a lot of garbage collector activity in the parallel case and the loads are pretty poorly balanced. I profiled the execution using threadscope and got the following:
I can see very clearly garbage collector running on HEC 1 is interrupting the computation on HEC 2. Moreover, HEC 1 clearly has less work assigned than HEC 2. As a test, I attempted to adjust the relative size of the two split lists to re-balance the loads but I saw no perceivable difference in the behavior of the program after doing so. I've also attempted to run this on different sized inputs, with larger minimum heap allocations, and also just using the parMap
function included in the Control.Monad.Par
library but those efforts also had no effect on the outcome.
I'm assuming that there is a space leak somewhere, maybe from the let (as,bs) = ...
assignment, because memory usage is so much higher in the parallel case. Is this the problem? If so, how should I go about resolving it?
EDIT: Splitting the input data manually as suggested, I am now seeing some small improvements in the timings. For a 6m point input file, I manually split the file into two 3m point files and into three 2m point files and reran the code using 2 and 3 cores respectively. Rough timings as follows:
1 Core: 6.5s
2 Core: 5.7s
3 Core: 4.5s
The new threadscope profile looks like this:
The strange behavior towards the beginning is gone but now there is still some it looks to me like there are still some apparent load balancing issues.
Upvotes: 4
Views: 149
Reputation: 52029
First of all I would suggest referencing your code-review posting (link) to give people more background info on what you are trying to do.
Your basic problem is that you are forcing Haskell to read the entire file into memory with length x
. What you want to do is to stream in the results so that as little of the file is in memory at any time.
What you have is a typical map-reduce computation, so to split the workload into two parts, my recommendation is:
Vector Int
Of course, the "middle" of the file is the beginning of a line which is close to the middle of the file.
The tricky part is step 4, so to simplify things let's assume the input file has already been split into two separate files part1
and part2
. Then your computation could look like this:
main = do
content1 <- LB.readFile "part1"
content2 <- LB.readFile "part2"
let v = runPar $ do a <- spawnP $ computeVector content1
b <- spawnP $ computeVector content2
vec1 <- get a
vec2 <- get b
-- combine vec1 and vec2
let vec3 = ...vec1 + vec2...
return vec3
...
You should try this approach up and determine what the speedup is. If it looks good then we can figure out how to virtually split up a file into multiple parts without having to actually copy the data.
Note - I haven't actually run this, so I don't know if there are quirks w.r.t. lazy-IO and the Par monad, but this idea in some form should work.
Upvotes: 4