Use case
A client sends subscribe messages through a WebSocket connection (we suppose that a client can send a lot of subscribe/unsubscribe messages per second). A subscribe message contains a topic that corresponds to one stream. Each stream is represented as an Enumerator and we want to keep it this way (so we do not want to use imperative operations as channel.push() ). So on the receive of a subscribe message the server has to add the corresponding Enumerators to the WebSocket output iteratee (= the WebSocket stream towards the client).
Solution
In order to multiplex on-demand several enumerators to the Websocket output Iteratee we have to use Concurrent.PatchPannel. This is a specific enumerator from which we can patchIn other Enumerators. So all we have to do is just to send back a PatchPannel to the client and then patchIn new Enumerator when we want to:
But here you notice that as the patcher has been gotten via a callback, we can not directly have access to the patcher when we handle messages from the client. Actually this callback will be called whenever the enumerator "out" is applied to an iteratee. But in our case it will only be applied once to the WebSocket output iteratee. So what to do in order to react on input events and patchIn enumerators? You can transform the callback into a future:
Maybe there is a better way to do this, if so please post a comment. But well that works and all of this is purely non-blocking and immutable with all the advantages that we know.
Don't forget about Concurrent.broadcast
Concurrent.PatchPannel handles the "N enumerators -> 1 iteratee" problem (and allows you to do it on-demand). To handle the "1 enumerator -> N iteratees" problem, use Concurrent.broadcast to get a new Enumerator optimized for broadcasting like this:
val (sharedStream, _) = Concurrent.broadcast(stream)
In the above example each streamForThisTopic that you patchIn should be a sharedStream as several clients may subscribe to the same topic.
At last but not least, you can mix this with built-in Enumeratees like Concurrent.buffer and Concurrent.dropInputIfNotReady to avoid blocking the enumerator stream for all clients if one client consume it too slowly (by default a sharedStream will wait that all the clients have consumed chunk N to send chunk N+1).
At last but not least, you can mix this with built-in Enumeratees like Concurrent.buffer and Concurrent.dropInputIfNotReady to avoid blocking the enumerator stream for all clients if one client consume it too slowly (by default a sharedStream will wait that all the clients have consumed chunk N to send chunk N+1).
Aucun commentaire:
Enregistrer un commentaire