Reactive Angular : Understanding AsyncSubject, BehaviorSubject and ReplaySubject

04/20/20193 Min Read — In Angular

Reactive Angular : Understanding AsyncSubject, BehaviorSubject and ReplaySubject

To understand various Subjects in RxJS, we first need to know the fundamentals and different aspects of “Reactive Programming”.

What is Reactive Programming in first place?

The term, “reactive,” is a general programming term that is focused on creating responsive (fast) event-driven applications, UI controllers reacting to mouse events, where an observable event stream is pushed to subscribers.

In short, non-blocking is reactive, because, instead of being blocked, we are now in the mode of reacting to notifications as operations complete or data becomes available.

One of the most important characteristics of Reactive Programming is that it allows us to implement the Push Model of data processing.

In contrast, the Pull Model is implemented by looping through an array, by an Iterable, or by using ES6 generator functions.

To get in-depth understanding of reactive programming paradigm, head over to my another post Reactive Programming - The best idea from Observer pattern, the Iterator pattern and Functional programming

Lets continue with,

  • What are Observables and Observers?
  • Observables vs Promises
  • What are Subjects
  • Parting Thoughts

What are Observables and Observers?

An observer is an object that handles a data stream pushed by an observable function.

There are two main types of observables: hot and cold.

  • A cold observable starts streaming data when some code invokes a subscribe() function on it.
  • A hot observable streams data as soon as it is created, even if there’s no subscriber interested in the data.

An observable is an object that streams elements from some data source (a socket, an array, UI events) one element at a time. An observable stream knows exactly how to do three things:

  1. Emit the next element from the source towards Observers
  2. Throw an error to indicate Observers, so that errors can be handled gracefully.
  3. Send a signal that the streaming is over (that the last element has been served)

In order to handle above actions, an observer object provides up to three callbacks:

  1. A function to handle the next element emitted by the observable
  2. A function to handle errors in the observable
  3. A function to be invoked when the stream of data completes

In short, An observable object is a more advanced version of an iterator. Iterators use the pull model to retrieve the data, whereas observables push the data to subscribers.

Observables vs Promises

Both Promises and Observables provide us with abstractions that help us deal with the asynchronous nature of our applications. However, there are important differences between the two:

  • Observables can define both the setup and teardown aspects of asynchronous behavior.
  • Observables are cancellable.
  • Observables can be retried using one of the retry operators provided by the API, such as retry and retryWhen. On the other hand, Promises require the caller to have access to the original function that returned the promise in order to have a retry capability.

What are Subjects?

A Subject is a special type of Observable that allows values to be multicasted to many Observers. Subjects are like EventEmitters.

Every Subject is an Observable and an Observer. What does that mean? It means that a subject can emit data, on top of having the capability to be subscribed to. You can subscribe to a Subject, and you can call next to feed values as well as error and complete.

A regular observable does not have the next() method as regular observables are not observers. So that’s the first distinctive capability of a subject: Data emission.

  1. AsyncSubject
  2. BehaviorSubject
  3. ReplaySubject

AsyncSubject

A variant of Subject that works differently than BehaviorSubject/ReplaySubject as both of them store values. However former will emit its latest value to all its observers only upon completion.

import { AsyncSubject } from 'rxjs';
export class StockService {
stockSubject;
constructor() {
this.stockSubject = new AsyncSubject();
}
public performOperation() {
stockSubject.next(100); // This value is never received by Observers.
stockSubject.next(101); // This value is never received by Observers.
stockSubject.next(102); // This value is never received by Observers.
stockSubject.subscribe(sharePrice => {
console.log('1st Trader:', sharePrice);}
);
stockSubject.next(103);
stockSubject.subscribe(sharePrice => {
console.log('2nd Trader:', sharePrice);
});
stockSubject.complete(); // Last value gets pushed to Observers i.e 103.
}
}
And the output:
1st Trader: 103
2nd Trader: 103

BehaviorSubject

A variant of Subject that requires an initial value and emits its current value whenever it is subscribed to.

Behavior subjects are similar to replay subjects, but will re-emit only the last emitted value, or a default value if no value has been previously emitted.

import { BehaviorSubject } from 'rxjs';
export class StockService {
stockSubject;
constructor() {
this.stockSubject = new BehaviorSubject(0);
}
public performOperation() {
stockSubject.subscribe(sharePrice => {
console.log('1st Trader:', sharePrice);} // Immediately receives initial value as that is the last value.
);
stockSubject.next(100);
stockSubject.subscribe(sharePrice => {
console.log('2nd Trader:', sharePrice); // Does not receives initial value as last emitted value is 100.
});
}
}
And the output:
1st Trader: 0
1st Trader: 100
2nd Trader: 100

ReplaySubject

A variant of Subject that "replays" or emits old values to new subscribers. It buffers a set number of values and will emit those values immediately to any new subscribers in addition to emitting new values to existing subscribers.

import { ReplaySubject } from 'rxjs';
export class StockService {
stockSubject;
constructor() {
this.stockSubject = new ReplaySubject(2); // No of values to be buffered.
}
public performOperation() {
stockSubject.next(100);
stockSubject.next(101);
stockSubject.next(102);
stockSubject.subscribe(sharePrice => {
console.log('1st Trader:', sharePrice);} // Receives last 2 values only.
);
stockSubject.next(103);
stockSubject.subscribe(sharePrice => {
console.log('2nd Trader:', sharePrice); // Receives last 2 values only.
});
}
}
And the output:
1st Trader: 101
1st Trader: 102
1st Trader: 103
2nd Trader: 102
2nd Trader: 103

Parting Thoughts

  • You have learned what an Observable is — An object that emits or publishes values over time and asynchronously.
  • You learned about cold and hot Observables — hot refers to when the Obseravble starts publishing values when it’s created even before getting any subscriptions.
  • You have also learned about special types of Observables such as Subjects, AsyncSubject, BehaviorSubject and ReplySubject.

Hope you find this post useful. Please share your thoughts in the comment section.

I’d be happy to talk! If you liked this post, please share, comment.





Keep me updated about software engineering stuff