Angular 8 Observables
Observables provide support for passing messages between publisher and subscription in our application. The observables can deliver the multiple values of any type like literal, messages, depending on the content.
It is an object that defines the callback method to handle the three types of notification that an observable can send. These are the following.
- Next: Required. The handler for each delivered value called zero or more type after execution start.
- Error: Optional. The handler is used for error warning. An error stops the execution of the observable instance.
- Complete: Optional.The execution for the execution complete notification. Delayed value can continue delivering to the next handler after the execution complete.
An observable deliver multiple values of any type-literals, messages, or events, depending on the context. The API for receiving values is the same whether the values are offered on the context. Because setup and teardown logic are handled by the observables, And our application code only needs to subscribing to consume values, and when it done, or unsubscribing.
Basic usage and terms
As a publisher, we create an observer instance that defines a subscriber function. It is the function that is executed when a consumer calls the subscribe() method.
To execute the
observable we have created and begun receiving notifications, we call the subscribe()
method, passing
an observer. This is a
JavaScript object which defines the handlers for the notifications we receive.
The subscribe ()
call
returns a subscription
object than the unsubscribe () method, what we call to stop receiving
notifications.
Example:
Observe geolocation updates // Create an Observable which will start listening to gelolocation updates // when a consumer subscribed. const locations = new Observable((observer) => { // Get the next callbacks. It will be passed in when const {next, error} = observer; let watchId; if ('geolocation' in navigator) { watchId = navigator.geolocation.watchPosition(next, error); } else { error('Geolocation not available'); } return {unsubscribe() { navigator.geolocation.clearWatch(watchId); }}; }); const locationsSubscription = locations.subscribe({ next(position) {console.log('Current Position: ', position); }, error(msg) { console.log('Error Getting Location: ', msg); } }); setTimeout(() => { locationsSubscription.unsubscribe(); }, 10000);
Creating observable
Use the observable constructor to create any observable stream of any type of method. The constructor makes an its argument the subscriber function to run when the observable's subscribe( ) method executes. A subscriber function receiver an observer object, and can publish values to the observer’s next() method.
For example:
Create observable with constructor
// The function runs when subscribe() is called in the function. function sequenceSubscriber(observer) { //It synchronously deliver 1, 2, and 3, then complete observer.next(1); observer.next(2); observer.next(3); observer.complete(); return {unsubscribe() {}}; } const sequence = new Observable(sequenceSubscriber); sequence.subscribe({ next(num) { console.log(num); }, complete() { console.log('Finished sequence'); } }); // Logs: // 1.. // 2.. // 3.. // Finished sequence
To take this example a little further, we can create an observable which publishes events.
Example:
Create with custom fromEvent function
function fromEvent(target, eventName) { return new Observable((observer) => { const handler = (e) => observer.next(e); // Add the event handler to the target target.addEventListener(eventName, handler); return () => { // Detach the event handler from the target target.removeEventListener(eventName, handler); }; }); }
Now we can use function to create an observable that publishes keydown events:
Use custom fromEvent function
const ESC_KEY = 27; const nameInput = document.getElementById('name') as HTMLInputElement; const subscription = fromEvent(nameInput, 'keydown') .subscribe((e: KeyboardEvent) => { if (e.keyCode === ESC_KEY){ nameInput.value = ''; } });
Multicasting
An Observable creates a new, independent execution for the subscribed observer. When an observer subscribes, the observable wire up an event handler and delivers the value to the observer. When a second observer subscribes, the observables then it wires up a new event handler and delivers values to the second observer in a separate execution.
Sometimes, In place of starting an independent execution for every subscriber, we want each subscription to get the same values-even if values have already begun to emit.
Let's see an example that counts from 1 to 3, with a one-second delay after each number emitted.
Create a delayed sequence
function sequenceSubscriber(observer) { const seq = [1, 2, 3]; let timeoutId; function doSequence(arr, idx) { timeoutId = setTimeout(() => { observer.next(arr[idx]); if (idx === arr.length - 1) { observer.complete(); } else { doSequence(arr, ++idx); } }, 1000); } doSequence(seq, 0); return {unsubscribe() { clearTimeout(timeoutId); }}; } //It Create a new Observable that will deliver the above sequence const sequence = new Observable(sequenceSubscriber); sequence.subscribe({ next(num) { console.log(num); }, complete() { console.log('Finished sequence'); } }); // Logs: // (at 1 second): 1 // (at 2 seconds): 2 // (at 3 seconds): 3 // (at 3 seconds): Finished sequence
Notice that if we subscribe twice, there will be two separate streams, each emitting values every second. It looks something like this:
Two subsciptions
// Subscribe starts the clock, and emit after per second sequence.subscribe({ next(num) { console.log('1st subscribe: ' + num); }, complete() { console.log('1st sequence finished.'); } }); // After 1/2 second, subscribe again. setTimeout(() => { sequence.subscribe({ next(num) { console.log('2nd subscribe: '+ num); }, complete() { console.log('2nd sequence finished.'); } }); }, 700) // Log: // (In 1 second): 1st subscribe: 1 // (In 1.5 seconds): 2nd subscribe: 1 // (In 2 seconds): 1st subscribe: 2 // (In 2.5 seconds): 2nd subscribe: 2 // (In 3 seconds): 1st subscribe: 3 // (In 3 seconds): 1st sequence finished // (In 3.5 seconds): 2nd subscribe: 3 // (In 3.5 seconds): 2nd sequence finished
Changing the observable to be multicasting look something like this:
Create a multicast Subscriber
function multicastSequenceSubscriber(){ const seq = [1, 2, 3]; const observers = []; let timeoutId; (runs when subscribe() // function is invoked) return (observer) => { observers.push(observer); if (observers.length === 1) { timeoutId = doSequence({ next(val) { observers.forEach(obs => obs.next(val)); }, complete() { observers.slice(0).forEach(obs => obs.complete()); } }, seq, 0); } return { unsubscribe() { observers.splice(observers.indexOf(observer), 1); if (observers.length === 0) { clearTimeout(timeoutId); } } }; }; } function doSequence(observer, arr, idx) { return setTimeout(() => { observer.next(arr[idx]); if (idx === arr.length - 1) { observer.complete(); } else { doSequence(observer, arr, ++idx); } }, 1000); } const multicastSequence = new Observable(multicastSequenceSubscriber()); // Subscribe starts the clock, and begins to emit after 1 second multicastSequence.subscribe({ next(num) { console.log('1st subscribe: ' + num); }, complete() { console.log('1st sequence finished.'); } }); // After 1 1/2 seconds, subscribe again (should "miss" the first value). setTimeout(() => { multicastSequence.subscribe({ next(num) { console.log('2nd subscribe: ' + num); }, complete() { console.log('2nd sequence finished.'); } }); }, 1500); // Logs: // (at 1 second): 1st subscribe: 1 // (at 2 seconds): 1st subscribe: 2 // (at 2 seconds): 2nd subscribe: 2 // (at 3 seconds): 1st subscribe: 3 // (at 3 seconds): 1st sequence finished // (at 3 seconds): 2nd subscribe: 3 // (at 3 seconds): 2nd sequence finished
Error handling
Because observables produce values asynchronously, try/catch will not effectively catch errors. Instead, we handle errors by specifying an error callback on the observer. An observable can produce values (calling the next callback), or it can complete, calling either the complete or error also causes the observable to clean up subscriptions and stop producing values.
myObservable.subscribe({ next(num) { console.log('Next num: ' + num)}, error(err) { console.log('Received an errror: ' + err)} });