Pipe Observables
An essential part of Reactive Programming consist of piping our Observables.
It brings us the possibility to transform multiple values on different steps to build complex dataflow in a few lines of code.
For example, we may think of a pipe filtering only odd numbers, and another converting incoming numbers to string.
Such a function is called an ObservablePipe.
Definition of an ObservablePipe
An ObservablePipe is a transform/modify data function for Observables. It usually subscribes to the provided Observable, performs some operations on the incoming values, and may emit these values or new ones.
Here's the type of an ObservablePipe:
interface IObservablePipe<GIn, GOut> {
(subscribe: IObservable<GIn>): IObservable<GOut>;
}
A ObservablePipe is a function that accepts an Observable as input (the subscribe
argument), and returns another
Observable as output. Both are related by an algorithm defined internally into the ObservablePipe.
As an example, here's a handmade ObservablePipe that emits only distinct received values:
- Short version
- With comments
const distinct = <GValue>(subscribe: IObservable<GValue>): IObservable<GValue> => {
return (emit: IObserver<GValue>): IUnsubscribe => {
let previousValue: GValue;
return subscribe((value: GValue): void => {
if (value !== previousValue) {
previousValue = value;
emit(value);
}
});
};
};
const distinct = <GValue>(subscribe: IObservable<GValue>): IObservable<GValue> => {
// returns a new Observable
return (emit: IObserver<GValue>): IUnsubscribe => {
// defines a previous value
let previousValue: GValue;
// subscribes to the provided Observable
return subscribe((value: GValue) => {
// if the received value differs from the previous one
if (value !== previousValue) {
// we replace previousValue with the new value
previousValue = value;
// and we emit the value
emit(value);
}
});
};
};
Then we may use our new ObservablePipe like this:
const distinctValues$ = distinct(observable);
const unsubscribe = distinctValues$(observer);
Click here to see the live demo
If this looks too complex, don't worry. We won't have to write from scratch any of them.
@lirx/core
comes with plenty of ObservablePipes, right out of the box.
Chaining many ObservablePipes
ObservablePipes are made to be chained together to create complex dataflow.
An ObservablePipe could be used directly like any ordinary functions (distinct(observable)
), but in practice, there
tend to be many of them convolved together, and quickly become unreadable (op4(op3(op2(op1(obs))))
).
For that reason, we will use the function pipe$$ that accomplishes the same thing while being much easier to read:
pipe$$(obs, [
op1,
op2,
op3,
op4,
]);
This function simply does:
[op1, op2, op3, op4].reduce((value, fnc) => fnc(value), obs);
Moreover, we can group our ObservablePipes too using pipe$$$ (less frequently used).
In the next chapter, we will see some examples.