Lawrence Woodman
Lawrence Woodman

Reputation: 1444

How to limit memory use when using a stream

I'm working with a data set that is too big to fit into memory and therefore would like to use a stream to process the data. However, I'm finding that the stream isn't limiting the amount of memory used as I expected. The sample code illustrates the problem and will run out of memory if the memory limit is set to 128mb. I know I could up the memory limit, but with the sort of big data set that I want to use this won't be an option. How should I keep the memory use down?

#lang racket

(struct prospects (src pos num max-num)
  #:methods gen:stream
  [(define (stream-empty? stream)
     (equal? (prospects-num stream) (prospects-max-num stream)))
   ;
   (define (stream-first stream)
     (list-ref (prospects-src stream) (prospects-pos stream)))
   ;
   (define (stream-rest stream)
     (let ([next-pos (add1 (prospects-pos stream))]
           [src (prospects-src stream)]
           [next-num (add1 (prospects-num stream))]
           [max-num (prospects-max-num stream)])
       (if (< next-pos (length src))
           (prospects src next-pos next-num max-num)
           (prospects src 0 next-num max-num))))])

(define (make-prospects src num)
  (prospects src 0 0 num))

(define (calc-stats prospects)
  (let ([just-a (stream-filter
                 (λ (p) (equal? "a" (vector-ref p 0)))
                 prospects)]
        [just-b (stream-filter
                 (λ (p) (equal? "b" (vector-ref p 0)))
                 prospects)])
    ;
    (let ([num-a (stream-length just-a)]
          [num-b (stream-length just-b)]
          [sum-ref1-a (for/sum ([p (in-stream just-a)])
                        (vector-ref p 1))]
          [sum-ref1-b (for/sum ([p (in-stream just-b)])
                        (vector-ref p 1))])
      ;
      #|
          ; Have also tried with stream-fold instead of for/sum as below:
          [sum-ref1-a (stream-fold
                       (λ (acc p) (+ acc (vector-ref p 1)))
                       0 just-a)]
          [sum-ref1-b (stream-fold
                       (λ (acc p) (+ acc (vector-ref p 1)))
                       0 just-b)])
      |#
      ;      
      (list num-a num-b sum-ref1-a sum-ref1-b))))

;================================
;           Main
;================================
(define num-prospects 800000)

(define raw-prospects (list #("a" 2 2 5 4 5 6 2 4 2 45 6 2 4 5 6 3 4 5 2)
                            #("b" 1 3 5 2 4 3 2 4 5 34 3 4 5 3 2 4 5 6 3)))

(calc-stats (make-prospects raw-prospects num-prospects))

Note: This program was created just to demonstrate the problem; the real stream would access a database to bring in the data.

Upvotes: 2

Views: 372

Answers (3)

stchang
stchang

Reputation: 2540

You are running out of memory because you are hanging onto the head of the stream while traversing it. The GC can't collect anything because since you have a pointer to the head, every element of the stream is still reachable.

To demonstrate, with this stream:

(define strm (make-prospects raw-prospects num-prospects))

this blows up:

(define just-a (stream-filter (λ (p) (equal? "a" (vector-ref p 0))) strm))
(stream-length just-a)

while this is fine:

(stream-length (stream-filter (λ (p) (equal? "a" (vector-ref p 0))) strm))

Upvotes: 1

Greg Hendershott
Greg Hendershott

Reputation: 16250

I upvoted Chris' excellent answer and suggest you pick it to mark as accepted.

However, what would use the least memory for a data set that doesn't fit into RAM? Probably something like the following pseudo code:

(require db)
(define dbc <open a db connection>)
(define just-a (query-value dbc "SELECT Count(*) FROM t WHERE n = $1" "a"))
(define just-b (query-value dbc "SELECT Count(*) FROM t WHERE n = $1" "b"))

Why this is a somewhat smartypants answer:

  • Ostensibly you asked about using Racket streams to handle things that don't fit in memory.

  • If you need more complex aggregations than count (or sum/min/max), you'll need to write more complex SQL queries, and probably want to make them stored procedures on the server.

Why it isn't necessarily smartypants:

  • You did mention your real use case involved a database. ;)

  • A speciality of a DB server and SQL is large data sets that don't fit in memory. Taking advantage of the server often will beat something DB-ish re-implemented in a general-purpose language (as well as probably being friendlier to other uses/users of the same DB server).

Upvotes: 3

C. K. Young
C. K. Young

Reputation: 223003

The main problem in your code was that you were trying to make multiple passes through the stream. (Each call to stream-length is one pass, and each of your calls to for/sum (or stream-fold, for that matter) is another pass.) This means that you had to materialise the whole stream without allowing the earlier stream elements to be garbage-collected.

Here's a modification of your code to make only one pass. Note that I made num-prospects to be 8,000,000 in my version, since even the multi-pass version didn't run out of memory on my system with only 800,000:

#lang racket
(require srfi/41)

(define (make-prospects src num)
  (stream-take num (apply stream-constant src)))

(define (calc-stats prospects)
  (define default (const '(0 . 0)))
  (define ht (for/fold ([ht #hash()])
                       ([p (in-stream prospects)])
               (hash-update ht (vector-ref p 0)
                            (λ (v)
                              (cons (add1 (car v))
                                    (+ (cdr v) (vector-ref p 1))))
                            default)))
  (define stats-a (hash-ref ht "a" default))
  (define stats-b (hash-ref ht "b" default))
  (list (car stats-a) (car stats-b) (cdr stats-a) (cdr stats-b)))

;================================
;           Main
;================================
(define num-prospects 8000000)

(define raw-prospects '(#("a" 2 2 5 4 5 6 2 4 2 45 6 2 4 5 6 3 4 5 2)
                        #("b" 1 3 5 2 4 3 2 4 5 34 3 4 5 3 2 4 5 6 3)))

(calc-stats (make-prospects raw-prospects num-prospects))

I should clarify that the use of srfi/41 is simply to enable writing a more-efficient version of make-prospects (though, the reference implementation of stream-constant isn't very efficient, but still more efficient than what your prospects stream generator did); calc-stats doesn't use it.

Upvotes: 3

Related Questions