Reputation: 2249
I have an F# class with a bunch of sequences. The class contains a simple next()
method, that returns the next element in the current sequence. If all elements of the current sequence has been returned, it will shift to the next sequence instead. The class contains a pointer, of which is the next element in the sequence and which sequence it is returning from.
I'm currently limited to only exposing the next()
method.
Some of the upstream classes will use my class (same object instance) between different threads. This will make the point go out of sync since multiple threads should all start from scratch. I know this isn't ideal, but it is what I have to work with at the moment.
Example:
Thread 1 next(): return elem. A
Thread 1 next(): return elem. B
Thread 2 next(): return elem. A
Thread 1 next(): return elem. C
Thread 2 next(): return elem. B
Is there a way of keeping track of the pointer for each thread?
I've been thinking about using Threading.Thread.CurrentThread.ManagedThreadId
as the key in a Map, and then return the pointer (and also update it there accordingly). I'm a bit concerned about the thread-safeness of this Map and if two threads update their state at once.
I hope somone can provide me with some thoughts on how to get this to work.
Upvotes: 2
Views: 205
Reputation: 6510
This can be accomplished by using a MailboxProcessor
to manage the state, then using a class to abstract the MailboxProcessor
from the consumer. If you share an instance across multiple threads, they will see each others updates in a thread-safe way. If you use a dedicated instance for each thread, they will only see their own updates. The code for that would like something like this:
// Add whatever other commands you need
type private SequenceMessage = Next of AsyncReplyChannel<int>
type IntSequence() =
let agent = MailboxProcessor<SequenceMessage>.Start
<| fun inbox ->
let rec loop state =
async {
let! message = inbox.Receive()
// Add other matches as requried
match message with
| Next channel ->
let newState = state + 1
channel.Reply(newState)
return! loop newState
}
loop 0
let next () =
agent.PostAndReply <| fun reply -> Next reply
let asyncNext () =
agent.PostAndAsyncReply <| fun reply -> Next reply
member __.Next () = next ()
member __.AsyncNext () = asyncNext ()
Then, to use it in a way that each thread sees the updates from every other thread, you would do something equivalent to this:
// To share state across multiple threads, use the same instance
let sequence = IntSequence()
[1..10]
|> List.map (fun _ -> sequence.AsyncNext())
|> Async.Parallel
|> Async.RunSynchronously
|> Array.iter (fun i -> printfn "%d" i)
Which prints:
1
2
3
4
5
6
7
8
9
10
And to use it in a way where each thread only sees its own updates, you would just change the previous example to something like this:
// To use a dedicate state for each thread, create a new instance
[1..10]
|> List.map (fun _ -> IntSequence())
|> List.map (fun sequence -> sequence.AsyncNext())
|> Async.Parallel
|> Async.RunSynchronously
|> Array.iter (fun i -> printfn "%d" i)
Which prints:
1
1
1
1
1
1
1
1
1
1
Upvotes: 2