objmagic
objmagic

Reputation: 1026

Lwt_stream usage and design concern

I am writing a lwt-based Twitter API library and I want to implement the Twitter API's cursor functionality using Lwt_stream library.

I decide to use Lwt_stream.from and supply this function an argument f.

Here is what I currently do

let get_follower ~access_token ~screen_name ?(count=5000) ?(wait=true) () =
  let base_uri = "https://api.twitter.com/1.1/followers/ids.json" in
  let cursor = ref (-1) in
  let f () =
    Client.do_get_request
      ~uri_parameters:
        ["screen_name", screen_name;
         "cursor", (string_of_int (!cursor));
         "count", (string_of_int count)]
      ~uri:(Uri.of_string base_uri)
      ~access_token:access_token
    () >>= fun res ->
    match res with
    | Ok (header, str) -> begin
      match (Yojson.Safe.from_string str |> user_ids_of_yojson) with
      | `Ok uids -> 
        cursor := uids.next_cursor;
        return (Some (Ok uids))
      | `Error msg -> failwith msg 
      end
    | Error e -> return (Some (Error (process_error_exn e))) in
  Lwt_stream.from f

I am not sure if I should use ref. The reason I use ref is that the behavior of f depends on the value it returns previously. To be specific, the value of cursor which will be used next time depends on current next_cursor, and if cursor is zero, f knows it reaches the end and returns None.

Is using ref here considered a good design choice? Is there any better way to implement this functionality?

Upvotes: 0

Views: 609

Answers (2)

ivg
ivg

Reputation: 35280

Antron gave an excellent answer to my opinion, but I would like just to share with few practical advice. If I were you, I wouldn't use one big function named f with a ref, well, mostly because it smells, and it is quite unreadable and doesn't scale. I would create a stream using Lwt_stream.create and work with it using a recursive function, that do the main loop, with all the logic separated in helper functions. The main loop function can have state with which it recurs, so that we don't need ugly references or explicit mutability.

So, below is an example, of how your code can be restructured. I also didn't find an explicit check for a zero cursor, that should be used to stop the stream (as you mentioned in text), so I added this. Otherwise the code should work as yours.

So, I've split it into three functions: make_request is responsible for making the request. parse_response is a pure transformation, that deconstructs the answer. And finally, loop function performs the main loop, makes requests, parses them and stops on zero.

Note: looks like that you're using Or_error monad and mixing it with exceptions. This is a bad idea, since it breaks precondition, that a well formed function that returns Or_error.t should never throw exception.

let get_follower ~access_token ~screen_name ?(count=5000) ?(wait=true) () =
  let base_uri = "https://api.twitter.com/1.1/followers/ids.json" in
  let make_request cursor = 
    Client.do_get_request
      ~uri:(Uri.of_string base_uri) ~access_token    
      ~uri_parameters:[
        "screen_name", screen_name;
        "cursor", string_of_int cursor;
        "count", string_of_int count
      ] () in 
  let parse_response = function
    | Error e -> Error (process_error_exn e)
    | Ok (header, str) -> 
      match Yojson.Safe.from_string str |> user_ids_of_yojson with
      | `Ok uids -> Ok uids
      | `Error msg -> failwith msg in
  let stream,push = Lwt_stream.create () in
  let rec loop cursor = 
    make_request cursor >|= parse_response >>= function
    | Ok {next_cursor=0} -> push None; return_unit
    | Ok {next_cursor=n} as r -> push (Some r); loop n
    | error -> push (Some error); loop cursor in
  async (fun () -> loop (-1));
  stream

Update

But of course this implementation will eagerly pull data from the server, and push it downstream. It is like a pump and pipe system. As soon as function is invoked, the pump it turned on and it will constantly pour water into the system. You can put a tap, by using bounded push, that can be obtained with Lwt_stream.create_bounded. But you will still have some amount of prefetching (or back to our pipe system analogy, you will have some kind of extension tank). Usually, this is not bad, as it removes some latency, but sometimes this is not what you want. In this case the only choice, that is left is to use explicit reference cell, to control your loop (and build stream using Lwt_stream.from). Other approach would be to change the interface, maybe you're trying to pack to much stuff into one abstraction. Maybe returning follower list Lwt.t or even follower Lwt.t list would be better. Or you can even create an abstract type follower that would hide a thread and return follower list, and lift it's accessors into the _ Lwt.t, e.g.,

 module Follower : sig
   type t
   val name : t -> string Lwt.t
   ...
 end 

In that case, this interface can be quite usable with Lwt_list module.

Upvotes: 2

antron
antron

Reputation: 3847

Since f takes unit and produces different results each time, as you've said, it will have to depend on some state, as I think you realize. It already does that by depending on the results of I/O, and I think this is what is making the question of the ref non-trivial to answer. Otherwise, the answer would be yes, it is necessary (see point (2) below).

I think there are two main possibilities for eliminating the syntactic ref, but neither of them makes sense.

  1. Hide these state bits somehow inside the I/O f is already doing, i.e. on the peer across the API. This doesn't seem possible, in fact it looks like the bits have to be on the client for the API to work at all.
  2. Hide these state bits somewhere else on the client. But there will still be something spiritually equivalent to a ref somewhere in the client, in one way or another. You could write or use some kind of wrapper that factors out this ref, but unless that wrapper is useful in many places, it would only risk making it less obvious that there is extra client-side state here. I would prefer to keep the state as local and as close to its usage as possible. Since f already necessarily does "dirty" things, f is the right place.

So, in summary, I would say keep the ref as it is. It is common for streams to be stateful anyway.


The above discussion assumes that you have to use Lwt_stream. If not, you can provide an alternative interface whose type looks something like get_follower : cursor:int -> ... -> (results * cursor) Lwt.t and let the caller worry about state if it has to. This would probably be what I'd try first.

EDIT

There is of course a disadvantage to that, which is the possibility of writing the calling code to supply the wrong cursor. You could hide the cursor by returning a partially applied function, but the calling code might be written to call the wrong function. Without linear types, the stream approach is safer if you are worried about these possibilities, so it has its advantages.

Upvotes: 1

Related Questions