MHMeraji
MHMeraji

Reputation: 11

Clojure Aleph WebSocket Client Buffers Messages

I'm trying to connect to gateio order_book websocket via aleph here is the document for the server : https://www.gate.mt/docs/developers/apiv4/ws/en/#limited-level-full-order-book-snapshot Here is the client side code i wrote


(ns test.core
  (:require
   [clojure.pprint                                   :as pp]
   [cheshire.core                                    :as cheshire]
   [manifold.stream                                  :as s]
   [manifold.deferred                                :as d]
   [aleph.http                                       :as http]
   [aleph.http.websocket.common                      :as ws-common]
   [aleph.netty :as a.netty]))


(def sample-url "wss://api.gateio.ws/ws/v4/")
(def sample-event (cheshire/generate-string {:time    (int (/ (System/currentTimeMillis) 1000))
                                             :channel "spot.order_book"
                                             :event   "subscribe"
                                             :payload ["BTC_USDT", "5", "100ms"]}))

@(s/put! @sample-socket sample-event)

(defn sample-process [message]
    (Thread/sleep 10000)
    (pp/pprint message))

(defn my-consume [f stream]
    (d/loop []
      (d/chain (s/take! stream ::drained)

               ;; if we got a message, run it through `f`
               (fn [msg]
                 (if (identical? ::drained msg)
                   ::drained
                   (f msg)))

               ;; wait for the result from `f` to be realized, and
               ;; recur, unless the stream is already drained
               (fn [result]
                 (when-not (identical? ::drained result)
                   (d/recur))))))

(my-consume sample-process @sample-socket) 

the server side is sending updates every 100 ms and each message has a timestamp in my consumer i wait for 10 second before each message is processed i expected to see a gap of 10 seconds because of the wait because the buffer size is set to 0 and there is no server side buffering according to the documentation but the consumer gets delayed behind indefinitely can anyone help me find where the message is being buffered and how can i control the size of it? BTW i'm using [aleph "0.7.1"] another thing that makes me wonder is that s/close! doesn't work and return when i use Thread/sleep in the consumption function but if there is no sleep the s/close! works perfectly fine! Thanks to all in advance.

Messages shouldn't get buffered or at least there should be some configuration about the buffer which is being used.

Upvotes: 0

Views: 41

Answers (0)

Related Questions