Sunday, 8 May 2016

Iterators and Observers in asynchronous javascript - part 3

Observable of Observables, concatAll, mergeAll, and SwitchLatest


Suppose we have an array of array as given below.

[
    [1], 
    [2, 3],
    [],
    [4]
]
If we want to flatten this two-dimenstional array in to a single-dimensional array, we can use the function concatAll on it. 
[[1], [2, 3], [], [4]].concatAll() will give us [1, 2, 3, 4] which is a new array with all the elments of the two-dimensional array excluding the array with no elements. 

In the same way we can apply concatAll() on an Observable of Observalbles. Observable is data + time i.e., the data arrives over time. Suppose we use the notation given below for an Observable of Observables.

-----------------time----------->
{
    ...{1},
    .......{2................................3},
    ...............{},
    ........................{4}
}.concatAll()

When we call concatAll() on the Observable of Observables above we get a flattened one-dimensional Observable given below. 
{...1....2................................3..4}

The Observable that we called concatAll() is a cold Observalbe and the data inside it is not retrieved from it untill we call forEach on it during the operation of flattening. This means that when we flatten the first inner observable with one item we begin to wait and then we flatten the first element (which is 2) of the second inner Observable and wait until the last element (which is 3) of the second inner Observable arrive. We do not jump and start forEach on the third Observabel although it arrives while we are waiting for the last element (which is 3) in the second inner Observable. We just put that third inner Observable in some buffer and continue waiting for the last element of the second inner Observable. Once forEnd succeeds pushing data from the second inner Observable only then it goes to the next Observable i.e., the third inner Observable. 

When we call forEach on the third inner Observable, the onComplete is fired as it has no element and therefore this Observable does not make in to the single-dimensional Observable. We continue with the next Observable. So the order of arrival of an Observable is preserved in the final single-dimensional Observable and that is why the resultatnt single-dimensional Observable is elongated and has more length showing the waiting time in the buffer for the those Observables that had to wait. 

There are Observables that emit data all the time regardless of that data is consumed or not and you use forEach on the Observable or not. For example an Observable that has data about mouse movement emits data irrespevtive of the fact that you have hooked a handler to capture those events or not for it. You will only start getting those event objects when you hook an event listner to that event. So you must call back the "hot" Observable to get data out of it but it emits data all the time even if you don't use forEach on it. Such Observables are not "cold" and they are called "hot" Observables.

We get a subscription object that has a dispose method for each Observable so that the consumer can call that to stop getting any more data. If we call dispose method on the outer Observable all listening to inner Observables will also stop. And we get the resultant flattened Observable with the data already listened to until that time. Similarly if an error occurs in the inner Observable [2........3] when we read data out of it. Suppose the error occurs after we have read 2 but not 3 then the stream would stop at this point. Remember that an Observable pushes data continuously unless the data finishes or some error occurs or the consumer calls the dispose method to inform the producer that it will not have any more data.

The method dispose does not trigger the method onComplete. If a consumer A sends dispose to the producer the producer makes sure not to send any data to consumer A but it will continue to send data to other consumers. When the producer has finished sending all the data then it will inform other consumers that are still listening that the data has finished but not consumer A.

The function mergeAll behaves differently. It works on the policy of first come first serve unlike concatAll. So in mergeAll() data is not emitted on the basis of the order of arrival of the collections inside rather we emit that data first which arrives first irrespective of which collection it belongs to and we do not buffer collections in a queue.
-----------------time----------->
{
    ...{1},
    .......{2................................3},
    ...............{},
    {........................4}
}.mergeAll()

will give
{...1....2..................4..............3}

In switchLatest we always switch to the latest source of data by sending to the last but one source a message that we don't want to consume any more data from you and we do that by calling subscription.dispose() on the last but one source of data. So the final flattened Observer for the Observer of Observers given above will be {...1....2..................4}.

-----------------time----------->
{
    ...{1},
    .......{2................................3},
    ...............{},
    {........................4}
}.switchLatest()

will give
{...1....2..................4}

We switch to the latest Observable as soon as we get an Observable even if the latest Observable is empty stream and has no data. We do not wait for the arival of data in the latest Observable in order to unsubscribe from the last but one stream of data. Its job is to listen to one Observable at a time. As soon a new inner Observable arrives in the outer Observable it switches to the latest calling dispose method on the Observer it already was listening to.

we can demonstrate how switchLater() can replace takeUntil() plus concatAll() in a code where both do the same thing but with the use of switchLater() the code becomes shorter. First we write the code without using these functions.Note that the code below has a bunch of callbacks and state variables which we later on do not use when we write the code in alternative ways.

function play(movieId, cancelButton, callback) {
    var movieTicket,
        playError,
        tryFinish = function () {
            if(playError){
                callback(null, playError);
            }else if (movieTicket && player.initialized) {
                callback(null, ticket);
            }
        };//internal function that takes no arguments ends here
    
    cancelButton.addEventListener("click", function () {
        playError = "cancel";
    });
    
    if(!player.initialized){
        player.init(function (error) {
            playError = error;
            tryFinish();
        })
    }
    
    authorizeMovie(movieId, function (error, ticket) {
        playError = error;
        movieTicket = ticket;
        tryFinish();
    });
    
}//function play ends here.
We can write the above code using takeUntil and concatAll as shown below. We write this code
using the following things in mind.

What are the collections/streams that I have?
What collections/streams do I want to have?

How do we combine the available collections/streams and compose them so that we get the
streams/collections that we want to have?

We have a stream when somebody clicks the cancel button. We have anoher collection when the movie is successfully authoized and that is a stream/Observable of one that comes back from the server. Another collection is that the player is initialized. All these things are done asynchronously.
So composing the collections above we can get the collection that we want i.e., movies to play stream. Then we can just forEach over this stream and get the movie played. So the stream that we finally get is a stream of authorized ids and it is a cold stream. When we run forEach on this cold stream only then the movie began to play.
Here is a better way to write the code given above.

var authorizations = 
    player.
        init().
            map( () =>
                playAttempts. 
                    map(movieId => 
                        player.authorize(movieId). 
                            catch(e => Observale.empty). 
                                takeUntil(cancels)). 
                    concatAll()).
            concatAll();
        
        authorizations.forEach(
            license => player.play(license),
            error => showDialog("Sorry, can't play right now.")
        );

In the code above when we call player.init(), an Observable is created which does nothing 
untill we call forEach on it because Observables are lazy. When we forEach on them onlly then
the player gets intialized. If the player gets successfully initialized then method next()
is called on the Observable which emits a value which may be true and then onCompleted is 
called as this Observable emits only one element and is therefore called an Observable of 
one. 

playAttempts. line shows that we listen to all the attempts made by the user to play. After
the player has been initialized successfully then we start listening to all the clicks and
make Observables from all those clicks using the method
Observable.fromEvent = function(playButton, click){//......see part 2..}

The map function works on a single Observable in those Observables created above. Each click
by the user is a click on a movie with some movieId and that is passed to the map function
one by one. This mapping function checks if this user is authorized to watch thie movie with
this id. So the map function returns a stream of decryption keys, one decryption key for 
each movieId that is authorized to this user. So for each movieId we substitute that movieId 
with a network request to get back a decryption key. Remember Observable is lazy so we do not
send the request to server at this point but we make the Observable replacing the movieId with
a request to the server for decryption key. So we are three levels deep at this point in the 
code. For every initialized player (even if it is one player that is actually initialized) we
make another stream of playAttempts and then for all the playAttmepts we make another Observable
for each movieId replacing it with a network request for decryption key. So we have to make
two concatAll to get a one-dimensional Observable as one call to concatAll can flatten only
two levels of Observables i.e., n numbers of Observables embedded one level deep inside 
another Observable and once we get the flattened Observable then we take the next Observable
and concatAll it with this one.

.catch() passes an error to a function and that function can optionally return an Observable
to resume from.  Observale.empty() is an Observable that has nothing to emit so when onNext()
is called on it, it sends onComplete directly. The line catch(e => Observale.empty) is just
like a try/catch with empty body of the catch block. We are only catching errors on attempts
to authorization for movieIds. We are not catching errors on player initialization in the 
code here. So if the catch in the code above gets an error the whole stream will not be 
stopped rather the catch will pass the error to a function and that funciton might return 
an Observable which can be concatenated with the existing stream and the error will not 
bubble up all the way. 

 


Saturday, 7 May 2016

Iterators and Observers in asynchronous javascript - part 2

Iterators and Observers in asynchronous javascript


You can think of Observable as a DOM event where you pass three handlers instead of one handler.


  1.  Step 1: The first handler is for getting the next data. It is equivalent to calling the next() method on an iterator to consume the next data. But in the iterator pattern it is the consumer that is the in-charge of the game. The consumer decides when to consume more data by calling the next() method. But in the Observable here it is the producer of the data that is in-charge and decides when to push new data to the consumers.
  2. Step 2: The second handler is optional and handles a siutation when an error occurs. The Observable pushes information to consumers that an error has occurred so that consumers know what is going on. It is equivalent to the phenomena of throwing error in the iterator pattern where the producer throws error when the consumer calls next() mehtod AND there is an error. But the consumer's action of calling the next() method causes the throwing of the error by the prouducer, so the consumer is incharge in the iterator pattern. Here in the Observable it is the producer who is inchage and decides to inform the consumers that an error has occured without any action on the part of consumers.
  3. Step 3: The thrid step is that the producer informs the consumer that it has no more data for the consumers. The producer is the in-charge and pushes out the information to the consumers that it has no more data for them. It is equivalent to the operation in the iterator pattern when the method next() is called and the producer sends the done: true to indicate that it has no more data. But in the iterator method it is the consumer whose calling of next() on the iterator causes this sending of done: true to indicate that the end of data. In the Observable it is the producer who pushes the information without any action on the part of consumers. This is an optional handler in the Observable.

 
    //Here is an Observable
    //The function fromEvent has been used rom the RxJs library to make the Observable.
    //element could be a button, a link, the document itself or any other element in the DOM.

var mouseMoves = Observable.fromEvent(element, "mouseMove");

 Using the forEach on the Observable the same way we do it on an array but array is blocking and we don't want blocking. In an Observable data arrives over time and with the arrival of the data the forEach works on the freshly arrived data.

//subscribe
    var subscription = mouseMoves.forEach({
        //step 1. next data
        event => console.log(event), /* event is the argument that the function gets whereas the function's body is on the right hands side of =>.*/
        //step 2. error occurs - optional step
        error => console.error(error), /* error is the argument that the function gets whereas the function's body is on the right hand side of =>.*/
        //step 3. data transfer completed - no more data - optional step
        () => console.log("done. No more data.") /* No argument to the function is represented by () and the body of the function handles what to do.*/
    }
    );
     
 If you want to consume the data inside an Observable then you have to make an object Observer with three handlers. Then hand the Observer object to the Observable and the Observable will push data into the Observer object and by extension your call backs in the Observer object.

 In the code below we are passing an object to the Observable mouseMoves. In th object we have name: value pairs for all the three steps discussed earlier. In the code above we just passed three functions to the Observable mouseMoves and not an object.

var subscription = mouseMoves.forEach({
   
      onNext: event => console.log(event),
      onError: error => console.loge(error),
      onCompleted: () => console.log("done")
   
  });
  //unsubscribe
  subscription.dispose();

/*******************************************************************************/
//Here is the method fromEvent

/* Converting Events to Observables. */
//fromEvent is a static method in the Observable.
Observable.fromEvent = function(dom, eventName){
    //returning Observable object
    return{
        forEach: function(observer){
            /* observer is an object with three things 1. onNext to push more data 2. onError to push error message 3. onCompleted to push "done" to consumers.*/
            var handler = (e) => observer.onNext(e);
            dom.addEventListener(eventName, handler);
            //returning Subscription object
            return{
                dispose: function(){
                    dom.removeEventListener(eventName, handler);
                }
            };
        }
    };
}

Iterators and Observers in asynchronous javascript - part 1


Functions map, filter, forEach, takeUntil and concatAll

Suppose we have an array [1, 2, 3] and we want to add 1 to all its elements, we can use the function map to do so.

[1, 2, 3].map( x => x + 1 );

This gives a new array [2, 3, 4]. Remember function map does not change the original array. It transforms the original array and makes a new array with new elements.

Now if we want to filter an array we can use the function filter(criteria for filtering the array).
[1, 2, 3, 4].filter(x => x > 2);

This gives a new array as filter does not change the original array. It makes a new array and copies only those elements in that array that passes the test to make it in to the new array. After applying the filter above we get [3, 4].

Here is another example.

var getOnlyTopRatedMovies = userName =>
userName.playLists.
    map(playList =>
        playList.videos.
            filter(video => video.rating === 5.0)
           ).concatAll();
         
getOnlyTopRatedMovies(userName).
forEach(movie =>
    console.log(movie));


concatAll will flatten the two-dimensional array in to a one-dimensional array. concatAll only works on Observable of Observables and not on a single Observable. We don't want to use concatAll on an Observable that has infinite Observables as we will not be able to flatten them as the data inside the infinite Observable never ends. An analogy of such a situation would be traffic situation where there are three lanes and one lane is under repair so we are left with two lanes. We can't use a strategy that involves allowing traffic from one lane first until all the vehicles are finished and then allowing vehicles from the other lane because arrival of the vehicles on the lanes never finish and will create a blocking situation for one of the lanes. We have a liste of generes and then we have a list of all the films in that genre.



For each userName (user) we have an array representing all the generes allowed to that user. Then wihin each genre we have list of movies belonging
to that genre. So it makes a two-dimensional array. In the script above we loop on those genres and one by one and pass a list of movies in a genre
to the operator map to make a new array and copy those film names in to the new array that pass the criteria described in the function inside the
operator filter. So we get a two-dimensional array of all films that have been rated 5.0 and next will the function concatAll make a one-dimensional
array out of it. Next the forEach function will print them to the console one by one.

None of the functions used above alter the original array. map, filter, concatAll, forEach does not change the array they work on. They just
make new copies.

In the code below we are using the function takeUntil instead of filter but its function is the same as that of the filter.


Mouse Drags Collection

This code creates  a stream of all the mouseDrags events that occur on an elment in the DOM. We pass that DOM element to the function getElmentDrags.

A mouse drag is a series of events that happen between a mouse down and a mouse up. We will show how we can compose simple events to make new and more complex events using methods like filter, concatAll, map, takeUntil etc.

//elmt could be any elment e.g., a button , a picture etc.
var getElmentDrags = elmt => 
elmt.mouseDowns.
/* map functin is all about replacing. We want to replace mouseDown events
with all the mouseMoves event that occur between the mouseDown event and mouseup event.
    */
map(mouseDown =>
/* For each mouseDown we are detecting the mouseMoves on the document level
  untill the event mouseup occurs. For each mouseDown we are going to put
  in the stream a collection of all the mouseMoves events untill the event
  mouseUps occurs. So we are taking each item (mouseDown) in a collection (mouseDowns) and replacing it with another collection (mouseMoves) in the stream.
*/
document.mouseMoves. 
takeUntil(document.mouseUps)). 
/* So now we have a two-dimensional array as we an array of arrays that have
mouseMoves events between mouseDown and mouseUps events. We can flattern       this two--dimensional array in to a single-dimensional array using concatAll.
*/
concatAll();

getElmentDrags(image).forEach(pos => image.position = pos);

/* Here in forEach we are now consuming the data that we created in the collection above and
doing something wit that data. Here we are moving the position of the image so that
the image actually drags around.
       */

Here is the complete code for Mouse Drags Collection.
var getElementDrags = elmt => {
 elmt.mouseDowns = Observable.fromEvent(elmt, 'mousedown');
 elmt.mouseUps = Observable.fromEvent(elmt, 'mouseup');
 elmt.mouseMoves = Observable.fromEvent(elmt, 'mousemove');
 return elmt.mouseDowns.
  map(mouseDown =>
   document.mouseMoves.
    takeUntil(document.mouseUps)).
  concatAll();
};
//using forEach we now consume the data generated above and stored in the flattened Observable created by concatAll.
getElementDrags(image).forEach(pos => image.position = pos);

//Here is the method fromEvent

/* Converting Events to Observables. */
//fromEvent is a static method in the Observable.
Observable.fromEvent = function(dom, eventName){
    //returning Observable object
    return{
        forEach: function(observer){
            /* observer is an object with three things 1. onNext to push more data 2. onError to push error message 3. onCompleted to push "done" to consumers.*/
            var handler = (e) => observer.onNext(e);
            dom.addEventListener(eventName, handler);
            //returning Subscription object
            return{
                dispose: function(){
                    dom.removeEventListener(eventName, handler);
                }
            };
        }
    };
}

The takeUntil works like a filter as we are reducing the number of mouseMoves events by
recording only those events that are between mouseDown and mouseUps events. So takeUntil
recudces the number of items in a collection just like filter does.

We can diagrammatically show the takeUntill as below. We are using the notation {...1..2.....3}
to denote an Observable which is a stream in which the data arrives over time. In the diagram below {......1......2...........................3} is the source collection from which we want to consume data and {........................4} is the stop collection which means that we do not want to consume more data from Observable {......1......2...........................3} as soon as the Observable {........................4} has some data to be consumed by us.

-------------------------time-------------------->
{......1......2...........................3}.takeUntil(
{........................4})

gives the Observable
 {......1......2........}

The Observable {......1......2...........................3} pushed data and we have a filter takeUntil which consumes data from this Observable until the second Observable {........................4} arrives at which point takeUnitl stops consuming more data from the first Observable. takeUntil() consumed 1 and 2 when 4 arrives and at that point it stops consuming more so 3 does not make it in the newly created Observable created by function takeUntil(). Therefore, the resultant Observable has data 1 and 2 only.

This approach makes the unsubscribing from consuming data manually by calling unsubscribe redundant. Normally we consume data until some event is fired (e.g., mouse is moved etc.) and then we unhook our handler once the event is fired. In the takeUntil approach we do not need to call unsubscribe to unsubscribe from events. Create streams of data that complete when you want them to.  Create new events from existing events and make those new events end when you want them to end. Use takeUntil() to do this.

We do not wait for the onComplete to happen in the stop collection rather we stop consuming data from the source collection as soon as onNext() happen in the stop collection i.e., as soon as the stop collection has some data to be consumed for us and we call onNext() on it to consume that data. At this point the first Observable will call onCompletion to indicate that it is not going to push more data. Since we are not interested int he data from the stop collection, therefore, at this point method dispose() will be called on the stop collection.

If the stop collection or the source collection hits an error then the outer Observable (i.e., the Observable that we are creating) will end with that error. We always forward along the errors.

Note: concatAll does not exist on an array in javascript but you can write the function yourself.