Reputation: 2280
tl;dr how to turn an IReduceInit into a lazy-seq of transformed values
I have a database query which yields a reasonably large dataset for live pivoting on the client (million or two rows, 25 attributes - no problem for a modern laptop).
My (simplified) stack was to call clojure.jdbc to get a (what I thought was lazy) sequence of result lines. I could just serialise that by passing it out as the body through ring-json middleware. There was an issue with ring-json building up the response string on heap, but that has an option as of 0.5.0 to stream the response out.
I discovered through profiling a couple of failure cases that actually clojure.jdbc is realising the whole result set in memory before handing it back. No problem! Rather than work with reducible-query
in that library, I decided to move to the new next.jdbc.
The key operation in next.jdbc is plan
which returns an IReduceInit, which I can use to run a query and get a resultset...
(into [] (map :cc_id) (jdbc/plan ds ["select cc_id from organisation where cc_id = '675192'"]))
["675192"]
However this realises the whole result set, and in the above case would give me all the ids upfront and in memory. Not an issue for one, but I usually have many.
The plan IReduceInit is a thing I can reduce if I give a starting value, so I could do the output in the reducing function... (thx @amalloy)
(reduce #(println (:cc_id %2)) [] (jdbc/plan ds ["select cc_id from organisation where cc_id = '675192'"]))
675192
nil
...but ideally I'd like to turn this IReduceInit into a lazy sequence of the values after applying a transform function to them, so I can use them with ring-json and cheshire. I don't see any obvious way of doing that.
Upvotes: 3
Views: 560
Reputation: 2280
There are quite a few reasons why my lazy-seq was a bad idea - even if I guarantee not to hold the head, exceptional issues during result streaming will no doubt leave the ResultSet lying around - the serialisation would happen away from the call stack that could clean up.
The need for laziness is driven by the desire not to realise the whole result in memory, the need for a seq or other coll? is so that the middleware will serialise it...
Therefore, make the IReduceInit JSONable directly, and then bypass the middleware. If there's an exception during the serialisation the control will pass through the IReduceInit from next.jdbc which can then clean up meaningfully.
;; reuse this body generator from my patch to ring.middleware.json directly, as the coll? check will fail
(defrecord JsonStreamingResponseBody [body options]
ring-protocols/StreamableResponseBody
(write-body-to-stream [_ _ output-stream]
(json/generate-stream body (io/writer output-stream) options)))
;; the year long yak is shaved in 8 lines by providing a custom serialiser for IReduceInits…
(extend-type IReduceInit
cheshire.generate/JSONable
(to-json [^IReduceInit results ^JsonGenerator jg]
(.writeStartArray jg)
(let [rf (fn [_ ^IPersistentMap m]
(cheshire.generate/encode-map m jg))]
(reduce rf nil results))
(.writeEndArray jg)))
;; at this point I can wrap the result from next.jdbc/plan with ->JsonStreamingResponseBody into the :body of the ring response and it will stream
It still feels like a lot of work to compose these features, adapter code always makes me worry that I'm missing a simple, idiomatic approach.
Upvotes: 1
Reputation: 715
The IReduceInit enables the JDBC resources to be wound up when the reduce function exits. This much more predictable than the LazySeq approach, which might NEVER release the JDBC resources.
You use a BlockingQueue and a future task to populate that queue like this
(defn lazywalk-reducible
"walks the reducible in chunks of size n,
returns an iterable that permits access"
[n reducible]
(reify java.lang.Iterable
(iterator [_]
(let [bq (java.util.concurrent.ArrayBlockingQueue. n)
finished? (volatile! false)
traverser (future (reduce (fn [_ v] (.put bq v)) nil reducible)
(vreset! finished? true))]
(reify java.util.Iterator
(hasNext [_] (or (false? @finished?) (false? (.isEmpty bq))))
(next [_] (.take bq)))))))
This will of course create a leak if an iterator is spawned but not followed to its conclusion.
I've not tested it thoroughly, it might have other problems too; but this sort of approach ought to work.
You could alternatively make it reify clojure.lang.ISeq
if a Java Iterable is not good enough for your use case; but then you start to get into HeadRetention questions; and how to handle a call to Object first()
which would be quite doable, but I didn't want to overthink this
Upvotes: 2
Reputation: 1660
Frustrating.
Why can't you do this with JDBC though? without any Clojure layers?
(let [resultset (.executeQuery connection "select ...")]
(loop
(when (.next resultset)
(let [row [(.getString resultset 1)
(.getString resultset 2)
...]])
(json/send row)
(recur)))
(json/end))
Of course, with a ResultSetMetaData you could automate the generation of the row into a function that could cope with anything returned.
Upvotes: 1
Reputation: 91907
reduce
works fine with IReduceInit. IReduceInit requires an initial value, which you specified when calling .reduce on it but not when using the reduce function; this explains why you saw one working but not the other.
However, this won't get you a lazy sequence. Part of reduce
's contract is that it eagerly consumes the entire input (we'll ignore reduced
which doesn't change anything meaningful). Your question is a specific case of the more general problem of dynamic scope: the sequence produced by JDBC is only "valid" within some context, and you need to do all your processing in this context, so it can't be lazy. Instead, you generally turn your program inside out: don't use the return value as a sequence, but pass the query engine a function and say, "please call this function with your results". The engine then ensures that the data is valid while it calls that function, and once the function returns it cleans up the data. I don't know about jdbc.next, but with the older jdbc you would use something like db-query-with-resultset
for this. You'd pass it some function that could add bytes to a pending HTTP response, and it would call that function many times.
This is all a bit vague because I don't know what HTTP handler you're using, or what its facilities are for handling streaming responses non-lazily, but this is the general idea you will have to go with if you want to process a dynamic-scoped resource: laziness just isn't an option.
Upvotes: 2