Reputation: 1444
I'm working with a data set that is too big to fit into memory and therefore would like to use a stream to process the data. However, I'm finding that the stream isn't limiting the amount of memory used as I expected. The sample code illustrates the problem and will run out of memory if the memory limit is set to 128mb. I know I could up the memory limit, but with the sort of big data set that I want to use this won't be an option. How should I keep the memory use down?
#lang racket
(struct prospects (src pos num max-num)
#:methods gen:stream
[(define (stream-empty? stream)
(equal? (prospects-num stream) (prospects-max-num stream)))
;
(define (stream-first stream)
(list-ref (prospects-src stream) (prospects-pos stream)))
;
(define (stream-rest stream)
(let ([next-pos (add1 (prospects-pos stream))]
[src (prospects-src stream)]
[next-num (add1 (prospects-num stream))]
[max-num (prospects-max-num stream)])
(if (< next-pos (length src))
(prospects src next-pos next-num max-num)
(prospects src 0 next-num max-num))))])
(define (make-prospects src num)
(prospects src 0 0 num))
(define (calc-stats prospects)
(let ([just-a (stream-filter
(λ (p) (equal? "a" (vector-ref p 0)))
prospects)]
[just-b (stream-filter
(λ (p) (equal? "b" (vector-ref p 0)))
prospects)])
;
(let ([num-a (stream-length just-a)]
[num-b (stream-length just-b)]
[sum-ref1-a (for/sum ([p (in-stream just-a)])
(vector-ref p 1))]
[sum-ref1-b (for/sum ([p (in-stream just-b)])
(vector-ref p 1))])
;
#|
; Have also tried with stream-fold instead of for/sum as below:
[sum-ref1-a (stream-fold
(λ (acc p) (+ acc (vector-ref p 1)))
0 just-a)]
[sum-ref1-b (stream-fold
(λ (acc p) (+ acc (vector-ref p 1)))
0 just-b)])
|#
;
(list num-a num-b sum-ref1-a sum-ref1-b))))
;================================
; Main
;================================
(define num-prospects 800000)
(define raw-prospects (list #("a" 2 2 5 4 5 6 2 4 2 45 6 2 4 5 6 3 4 5 2)
#("b" 1 3 5 2 4 3 2 4 5 34 3 4 5 3 2 4 5 6 3)))
(calc-stats (make-prospects raw-prospects num-prospects))
Note: This program was created just to demonstrate the problem; the real stream would access a database to bring in the data.
Upvotes: 2
Views: 372
Reputation: 2540
You are running out of memory because you are hanging onto the head of the stream while traversing it. The GC can't collect anything because since you have a pointer to the head, every element of the stream is still reachable.
To demonstrate, with this stream:
(define strm (make-prospects raw-prospects num-prospects))
this blows up:
(define just-a (stream-filter (λ (p) (equal? "a" (vector-ref p 0))) strm))
(stream-length just-a)
while this is fine:
(stream-length (stream-filter (λ (p) (equal? "a" (vector-ref p 0))) strm))
Upvotes: 1
Reputation: 16250
I upvoted Chris' excellent answer and suggest you pick it to mark as accepted.
However, what would use the least memory for a data set that doesn't fit into RAM? Probably something like the following pseudo code:
(require db)
(define dbc <open a db connection>)
(define just-a (query-value dbc "SELECT Count(*) FROM t WHERE n = $1" "a"))
(define just-b (query-value dbc "SELECT Count(*) FROM t WHERE n = $1" "b"))
Why this is a somewhat smartypants answer:
Ostensibly you asked about using Racket streams to handle things that don't fit in memory.
If you need more complex aggregations than count (or sum/min/max), you'll need to write more complex SQL queries, and probably want to make them stored procedures on the server.
Why it isn't necessarily smartypants:
You did mention your real use case involved a database. ;)
A speciality of a DB server and SQL is large data sets that don't fit in memory. Taking advantage of the server often will beat something DB-ish re-implemented in a general-purpose language (as well as probably being friendlier to other uses/users of the same DB server).
Upvotes: 3
Reputation: 223003
The main problem in your code was that you were trying to make multiple passes through the stream. (Each call to stream-length
is one pass, and each of your calls to for/sum
(or stream-fold
, for that matter) is another pass.) This means that you had to materialise the whole stream without allowing the earlier stream elements to be garbage-collected.
Here's a modification of your code to make only one pass. Note that I made num-prospects
to be 8,000,000 in my version, since even the multi-pass version didn't run out of memory on my system with only 800,000:
#lang racket
(require srfi/41)
(define (make-prospects src num)
(stream-take num (apply stream-constant src)))
(define (calc-stats prospects)
(define default (const '(0 . 0)))
(define ht (for/fold ([ht #hash()])
([p (in-stream prospects)])
(hash-update ht (vector-ref p 0)
(λ (v)
(cons (add1 (car v))
(+ (cdr v) (vector-ref p 1))))
default)))
(define stats-a (hash-ref ht "a" default))
(define stats-b (hash-ref ht "b" default))
(list (car stats-a) (car stats-b) (cdr stats-a) (cdr stats-b)))
;================================
; Main
;================================
(define num-prospects 8000000)
(define raw-prospects '(#("a" 2 2 5 4 5 6 2 4 2 45 6 2 4 5 6 3 4 5 2)
#("b" 1 3 5 2 4 3 2 4 5 34 3 4 5 3 2 4 5 6 3)))
(calc-stats (make-prospects raw-prospects num-prospects))
I should clarify that the use of srfi/41
is simply to enable writing a more-efficient version of make-prospects
(though, the reference implementation of stream-constant
isn't very efficient, but still more efficient than what your prospects
stream generator did); calc-stats
doesn't use it.
Upvotes: 3