Observable of Observables, concatAll, mergeAll, and SwitchLatest
Suppose we have an array of array as given below.
[
[1],
[2, 3],
[],
[4]
]
If we want to flatten this two-dimenstional array in to a single-dimensional array, we can use the function concatAll on it.
[[1], [2, 3], [], [4]].concatAll() will give us [1, 2, 3, 4] which is a new array with all the elments of the two-dimensional array excluding the array with no elements.
In the same way we can apply concatAll() on an Observable of Observalbles. Observable is data + time i.e., the data arrives over time. Suppose we use the notation given below for an Observable of Observables.
-----------------time----------->
{
...{1},
.......{2................................3},
...............{},
........................{4}
}.concatAll()
When we call concatAll() on the Observable of Observables above we get a flattened one-dimensional Observable given below.
{...1....2................................3..4}
The Observable that we called concatAll() is a cold Observalbe and the data inside it is not retrieved from it untill we call forEach on it during the operation of flattening. This means that when we flatten the first inner observable with one item we begin to wait and then we flatten the first element (which is 2) of the second inner Observable and wait until the last element (which is 3) of the second inner Observable arrive. We do not jump and start forEach on the third Observabel although it arrives while we are waiting for the last element (which is 3) in the second inner Observable. We just put that third inner Observable in some buffer and continue waiting for the last element of the second inner Observable. Once forEnd succeeds pushing data from the second inner Observable only then it goes to the next Observable i.e., the third inner Observable.
When we call forEach on the third inner Observable, the onComplete is fired as it has no element and therefore this Observable does not make in to the single-dimensional Observable. We continue with the next Observable. So the order of arrival of an Observable is preserved in the final single-dimensional Observable and that is why the resultatnt single-dimensional Observable is elongated and has more length showing the waiting time in the buffer for the those Observables that had to wait.
There are Observables that emit data all the time regardless of that data is consumed or not and you use forEach on the Observable or not. For example an Observable that has data about mouse movement emits data irrespevtive of the fact that you have hooked a handler to capture those events or not for it. You will only start getting those event objects when you hook an event listner to that event. So you must call back the "hot" Observable to get data out of it but it emits data all the time even if you don't use forEach on it. Such Observables are not "cold" and they are called "hot" Observables.
We get a subscription object that has a dispose method for each Observable so that the consumer can call that to stop getting any more data. If we call dispose method on the outer Observable all listening to inner Observables will also stop. And we get the resultant flattened Observable with the data already listened to until that time. Similarly if an error occurs in the inner Observable [2........3] when we read data out of it. Suppose the error occurs after we have read 2 but not 3 then the stream would stop at this point. Remember that an Observable pushes data continuously unless the data finishes or some error occurs or the consumer calls the dispose method to inform the producer that it will not have any more data.
The method dispose does not trigger the method onComplete. If a consumer A sends dispose to the producer the producer makes sure not to send any data to consumer A but it will continue to send data to other consumers. When the producer has finished sending all the data then it will inform other consumers that are still listening that the data has finished but not consumer A.
The function mergeAll behaves differently. It works on the policy of first come first serve unlike concatAll. So in mergeAll() data is not emitted on the basis of the order of arrival of the collections inside rather we emit that data first which arrives first irrespective of which collection it belongs to and we do not buffer collections in a queue.
-----------------time----------->
{
...{1},
.......{2................................3},
...............{},
{........................4}
}.mergeAll()
will give
{...1....2..................4..............3}
In switchLatest we always switch to the latest source of data by sending to the last but one source a message that we don't want to consume any more data from you and we do that by calling subscription.dispose() on the last but one source of data. So the final flattened Observer for the Observer of Observers given above will be {...1....2..................4}.
will give
{...1....2..................4..............3}
In switchLatest we always switch to the latest source of data by sending to the last but one source a message that we don't want to consume any more data from you and we do that by calling subscription.dispose() on the last but one source of data. So the final flattened Observer for the Observer of Observers given above will be {...1....2..................4}.
-----------------time----------->
{
...{1},
.......{2................................3},
...............{},
{........................4}
}.switchLatest()
will give
{...1....2..................4}
We switch to the latest Observable as soon as we get an Observable even if the latest Observable is empty stream and has no data. We do not wait for the arival of data in the latest Observable in order to unsubscribe from the last but one stream of data. Its job is to listen to one Observable at a time. As soon a new inner Observable arrives in the outer Observable it switches to the latest calling dispose method on the Observer it already was listening to.
we can demonstrate how switchLater() can replace takeUntil() plus concatAll() in a code where both do the same thing but with the use of switchLater() the code becomes shorter. First we write the code without using these functions.Note that the code below has a bunch of callbacks and state variables which we later on do not use when we write the code in alternative ways.
So composing the collections above we can get the collection that we want i.e., movies to play stream. Then we can just forEach over this stream and get the movie played. So the stream that we finally get is a stream of authorized ids and it is a cold stream. When we run forEach on this cold stream only then the movie began to play.
Here is a better way to write the code given above.
will give
{...1....2..................4}
We switch to the latest Observable as soon as we get an Observable even if the latest Observable is empty stream and has no data. We do not wait for the arival of data in the latest Observable in order to unsubscribe from the last but one stream of data. Its job is to listen to one Observable at a time. As soon a new inner Observable arrives in the outer Observable it switches to the latest calling dispose method on the Observer it already was listening to.
we can demonstrate how switchLater() can replace takeUntil() plus concatAll() in a code where both do the same thing but with the use of switchLater() the code becomes shorter. First we write the code without using these functions.Note that the code below has a bunch of callbacks and state variables which we later on do not use when we write the code in alternative ways.
function play(movieId, cancelButton, callback) { var movieTicket, playError, tryFinish = function () { if(playError){ callback(null, playError); }else if (movieTicket && player.initialized) { callback(null, ticket); } };//internal function that takes no arguments ends here cancelButton.addEventListener("click", function () { playError = "cancel"; }); if(!player.initialized){ player.init(function (error) { playError = error; tryFinish(); }) } authorizeMovie(movieId, function (error, ticket) { playError = error; movieTicket = ticket; tryFinish(); }); }//function play ends here.
We can write the above code using takeUntil and concatAll as shown below. We write this code
using the following things in mind.
What are the collections/streams that I have?
What collections/streams do I want to have? How do we combine the available collections/streams and compose them so that we get the
streams/collections that we want to have?We have a stream when somebody clicks the cancel button. We have anoher collection when the movie is successfully authoized and that is a stream/Observable of one that comes back from the server. Another collection is that the player is initialized. All these things are done asynchronously.
So composing the collections above we can get the collection that we want i.e., movies to play stream. Then we can just forEach over this stream and get the movie played. So the stream that we finally get is a stream of authorized ids and it is a cold stream. When we run forEach on this cold stream only then the movie began to play.
Here is a better way to write the code given above.
var authorizations = player. init(). map( () => playAttempts. map(movieId => player.authorize(movieId). catch(e => Observale.empty). takeUntil(cancels)). concatAll()). concatAll(); authorizations.forEach( license => player.play(license), error => showDialog("Sorry, can't play right now.") );
In the code above when we call player.init(), an Observable is created which does nothing
untill we call forEach on it because Observables are lazy. When we forEach on them onlly then
the player gets intialized. If the player gets successfully initialized then method next()
is called on the Observable which emits a value which may be true and then onCompleted is
called as this Observable emits only one element and is therefore called an Observable of
one.
playAttempts. line shows that we listen to all the attempts made by the user to play. After
the player has been initialized successfully then we start listening to all the clicks and
make Observables from all those clicks using the method
Observable.fromEvent = function(playButton, click){//......see part 2..}
The map function works on a single Observable in those Observables created above. Each click
by the user is a click on a movie with some movieId and that is passed to the map function
one by one. This mapping function checks if this user is authorized to watch thie movie with
this id. So the map function returns a stream of decryption keys, one decryption key for
each movieId that is authorized to this user. So for each movieId we substitute that movieId
with a network request to get back a decryption key. Remember Observable is lazy so we do not
send the request to server at this point but we make the Observable replacing the movieId with
a request to the server for decryption key. So we are three levels deep at this point in the
code. For every initialized player (even if it is one player that is actually initialized) we
make another stream of playAttempts and then for all the playAttmepts we make another Observable
for each movieId replacing it with a network request for decryption key. So we have to make
two concatAll to get a one-dimensional Observable as one call to concatAll can flatten only
two levels of Observables i.e., n numbers of Observables embedded one level deep inside
another Observable and once we get the flattened Observable then we take the next Observable
and concatAll it with this one.
.catch() passes an error to a function and that function can optionally return an Observable
to resume from. Observale.empty() is an Observable that has nothing to emit so when onNext()
is called on it, it sends onComplete directly. The line catch(e => Observale.empty) is just
like a try/catch with empty body of the catch block. We are only catching errors on attempts
to authorization for movieIds. We are not catching errors on player initialization in the
code here. So if the catch in the code above gets an error the whole stream will not be
stopped rather the catch will pass the error to a function and that funciton might return
an Observable which can be concatenated with the existing stream and the error will not
bubble up all the way.