Reputation: 702
I am implementing a haskell program wich compares each line of a file with each other line in the file. For symplicity let's assume the datastructure represented by one line is just an Int, and my algorithm is the squared distance. This would I implement as follows:
--My operation
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
combineDistances :: [Int] -> Int
combineDistances = sum
--Applying my operation simply on a file
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
allDistances _ = 0
--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
where infiniteList :: Int->Int-> [Int]
infiniteList i j = (i + j) : infiniteList j (i+j)
Unfortunately the complete file will be kept in memory. To prevent possible out of memory exceptions on very big files, i would like to seek the filecursor back to the beginning of the file, at each recursion of 'allDistances'.
In the book "Real World Haskell" an implementation is given of mapreduce, with a function to split a file in chunks (chapter 24, available here). I modified the chunking function to, instead of dividing the complete file in chunks, return as many chunks as lines with each chunk representing one element of
tails . lines. readFile
The full implementation is (plus the previous code region)
import qualified Data.ByteString.Lazy.Char8 as Lazy
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
import System.IO
--Applying my operation using mapreduce on a very big file
sumOfDistancesOnFile :: FilePath -> IO Int
sumOfDistancesOnFile path = chunkedFileOperation chunkByLinesTails (distancesUsingMapReduce) path
distancesUsingMapReduce :: [Lazy.ByteString] -> Int
distancesUsingMapReduce = mapReduce rpar (distancesFirstToTail . lexer)
rpar combineDistances
where lexer :: Lazy.ByteString -> [Int]
lexer chunk = map (read . Lazy.unpack) (Lazy.lines chunk)
distancesOneToMany :: Int -> [Int] -> Int
distancesOneToMany one many = combineDistances $ map (distance one) many
distancesFirstToTail :: [Int] -> Int
distancesFirstToTail s =
if not (null s)
then distancesOneToMany (head s) (tail s)
else 0
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
-> (a -> b) -- map function
-> Strategy c -- evaluation strategy for reduction
-> ([b] -> c) -- reduce function
-> [a] -- list to map over
-> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
mapResult `pseq` reduceResult
where mapResult = parMap mapStrat mapFunc input
reduceResult = reduceFunc mapResult `using` reduceStrat
--Working with (file)chunks:
data ChunkSpec = CS{
chunkOffset :: !Int64
, chunkLength :: !Int64
} deriving (Eq,Show)
chunkedFileOperation :: (NFData a)=>
(FilePath-> IO [ChunkSpec])
-> ([Lazy.ByteString]-> a)
-> FilePath
-> IO a
chunkedFileOperation chunkCreator funcOnChunks path = do
(chunks, handles)<- chunkedRead chunkCreator path
let r = funcOnChunks chunks
(rdeepseq r `seq` return r) `finally` mapM_ hClose handles
chunkedRead :: (FilePath -> IO [ChunkSpec])
-> FilePath
-> IO ([Lazy.ByteString], [Handle])
chunkedRead chunkCreator path = do
chunks <- chunkCreator path
liftM unzip . forM chunks $ \spec -> do
h <- openFile path ReadMode
hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
chunk <- Lazy.take (chunkLength spec) `liftM` Lazy.hGetContents h
return (chunk,h)
-- returns set of chunks representing tails . lines . readFile
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
bracket (openFile path ReadMode) hClose $ \h-> do
totalSize <- fromIntegral `liftM` hFileSize h
let chunkSize = 1
findChunks offset = do
let newOffset = offset + chunkSize
hSeek h AbsoluteSeek (fromIntegral newOffset)
let findNewline lineSeekOffset = do
eof <- hIsEOF h
if eof
then return [CS offset (totalSize - offset)]
else do
bytes <- Lazy.hGet h 4096
case Lazy.elemIndex '\n' bytes of
Just n -> do
nextChunks <- findChunks (lineSeekOffset + n + 1)
return (CS offset (totalSize-offset):nextChunks)
Nothing -> findNewline (lineSeekOffset + Lazy.length bytes)
findNewline newOffset
findChunks 0
Unfortunately, on a bigger file (for example 2000 lines) the mapreduce version throws an exception:
* Exception: getCurrentDirectory: resource exhausted (Too many open files)
I'm a bit ashamed to not be able to debug the program myself, but I only know how to debug java/c# code. And I also don't know how the file chunking and reading could be properly tested. I expect the problem not to be part of the mapreduce function itself, as a similar version without mapreduce also throws an exception. In that attempt I had the chunkedFileOperation accept both the operation for one chunk and the 'reduce' function, which it applied directly.
Btw, I'm running
HaskellPlatform 2011.2.0 on Mac OS X 10.6.7 (snow leopard)
with the following packages:
bytestring 0.9.1.10
parallel 3.1.0.1
and i qualify as a self-educated beginner/fresh haskell programmer
Upvotes: 3
Views: 546
Reputation: 137987
You're using lazy IO, so those files opened with readFile
aren't being closed in a timely fashion. You'll need to think of a solution that explicitly closes the files regularly (e.g. via strict IO, or iteratee IO).
Upvotes: 4
Reputation: 363797
This error means exactly what it says: your process has too many files open. The OS imposes an arbitrary limit on the number of files (or directories) which a process can simultaneously read. See your ulimit(1)
manpage and/or limit the number of mappers.
Upvotes: 0