Deivit
Deivit

Reputation: 21

clojure Riemann project collectd

I am trying to do a custom configuration apparently simple using Riemann and Collectd. Basically I'd like to calculate the ratio between two streams. In order to do that I tried something like (as in Rieamann API project suggestion here):

(project [(service "cahe-miss")
      (service "cache-all")]
  (smap folds/quotient
    (with :service "ratio"
    index)))

Which apparently works, but after a while I noticed some of the results where miss calculated. After log debugging I finished with the following configuration in order to see what's happening and proint the values:

(project [(service "cache-miss")
          (service "cache-all")]
  (fn [[miss all]]
    (if (or (nil? miss) (nil? all)) 
      (do nil) 
      (do (where (= (:time miss) (:time all))
        ;to print time marks
        (println (:time all)) 
        (println (:time miss))
        ; to distinguish easily each event
        (println "NEW LINE") 
      )) 
    )
  )
)

My surprise is that each time I get new data from collectd (every 10 seconds) the function I created is executed twice, like reusing previous unused data, and more over it looks like it doesn't care at all about my time equality constraint in the (where (= :time....) clasue. The problem is than I am dividing metrics with different time stamp. Below some ouput of the previous code:

1445606294
1445606294
NEW LINE -- First time I get data
1445606304
1445606294
NEW LINE
1445606304
1445606304
NEW LINE -- Second time I get data
1445606314
1445606304
NEW LINE
1445606314
1445606314
NEW LINE -- Third time I get data

Is there anyone that can give a hint on how to get the data formatted as I expected? I assume there is something I am not understading about the "project" function. Or something related to how incoming data is processed in riemann.

Thanks in advance!

Updated

I managed to solve my problem but still I don't have a clear idea of how it works, however I managed to do so. Right now I am receiving two different streams from collectd tail plugin (from nginx logs) and I managed to make the quotient between them as it follows:

(where (or (service "nginx/counter-cacheHit") (service "nginx/counter-cacheAll"))
    (coalesce
        (smap folds/quotient (with :service "cacheHit" (scale (* 1 100) index)))))

I have tested it widely and up to now it produces the right results. However I still don't understand several things... First, how it is that coalesce only returns data after both events are processed. Collectd sends the events of the both streams every two seconds with the same time mark, usin "project" instead of "coalesce" resulted in two different executions of smap each two seconds (one for each event), however coalesce results only with one execution of smap with the two events with the same time mark, which is exactly what I wanted.

Finally, I don't know which is the criteria to choose which is the numerator and denominator. Is it becaouse of the "or" clauses in "where" clause?

Anyway, with some blackmagic behind it but I managed to solve my problem ;^)

Thank you all!

Upvotes: 2

Views: 155

Answers (1)

Arthur Ulfeldt
Arthur Ulfeldt

Reputation: 91554

taking the ratios between streams that where moving at different rates didn't work out for me. I have since settled on calculating ratios and rates within a fixed time interval or a moving time interval. This way you are capturing a consistent snapshot of events in a time block and calculating over this. Here is some elided code from comparing the rate a service is receiving events to the rate at which it is forwarding events:

(moving-time-window 30 ;; seconds
  (smap (fn [events]
          (let [in (or (->> events
                            (filter #(= (:service %) "event-received"))
                            count)
                       0)
                out (or (->> events
                             (filter #(= (:service %) "event-sent"))
                             count)
                        0)
                flow-rate (float (if (> in 0) (/ out in) 0))]
            {:service "flow rate"
             :metric flow-rate
             :host "All"
             :state (if (< flow-rate 0.99) "WARNING" "OK")
             :time (:time (last events))
             :ttl default-interval}))
        (tag ["some" "tags" "here"] index)
        (where (and
                (< (:metric event) 0.9)
                (= (:environment event) "production"))
               (throttle 1 3600 send-to-slack))))

This takes in a fixed window of events, calculates the ratio for that block and emits an event containing that ratio as it's metric. then if the metric is bad it calls me on slack.

Upvotes: 0

Related Questions