tldr
tldr

Reputation: 12112

Elixir: Observables

Elixir streams provide iterables, but I couldn't find any information on observables (Google was no help here). I'd greatly appreciate it if someone could point me to resources for the same.

Upvotes: 2

Views: 1157

Answers (3)

mendrugory
mendrugory

Reputation: 1235

I have built a PoC of a Pub-Sub system where I have followed a kind of "Observable Pattern": http://mendrugory.weebly.com/blog/pub-sub-system-in-elixir.

In order keep the state (what process has to be informed) I have used an Agent.

Upvotes: 0

Alexei Sholik
Alexei Sholik

Reputation: 7471

Streams in Elixir are abstractions over function composition. In the end, all you get is a function, calling which will loop over the input stream and transform it.

In order to build stateful streams like the example in Twitter4j (buffering new twitter statutes during one second and dispatching them all in one list), you'll need to use the building blocks that can have state. In Elixir, it is common to encapsulate state in processes.

The example might look like this

tweetsPerSecond =
  twitterStream 
  |> SS.buffer({1, :second}) 
  |> SS.map(&length(&1))

SS.subscribe(tweetsPerSecond, fn n -> IO.puts "Got #{n} tweets in the last second" end)
SS.subscribe(tweetsPerSecond, fn n -> IO.puts "Second subscriber" end)

SS is a new module we need to write to implement the observable functionality. The core idea (as far as I get it) is being able to subscribe to a stream without modifying it.

In order for this to work, the twitterStream itself should be a process emitting events for others to consume. You can't use Stream in this case because it has "blocking pull" semantics, i.e. you won't be able to interrupt waiting on the next element in a stream after some fixed amount of time has elapsed.

To achieve the equivalent functionality in Elixir, take a look at the GenEvent module. It provides the ability to emit and subscribe to events. There is no stream-like interface for it though, not that I'm aware of.

Upvotes: 5

bitwalker
bitwalker

Reputation: 9251

You can combine Stream and Enum to write observable-style code. Here's an example of an echo server written in observable fashion:

IO.stream(:stdio, :line) 
|> Stream.map(&String.upcase/1)
|> Enum.each(&IO.write(&1))

Basically, for each line you send to standard input, it will be converted to uppercase and then printed back to standard output. This is a simple example, but the point is that all you need to compose an observable is already available via Stream and Enum.

Upvotes: 6

Related Questions