Skip to main content

Pipe Observables

An essential part of Reactive Programming consist of piping our Observables.

It brings us the possibility to transform multiple values on different steps to build complex dataflow in a few lines of code.

For example, we may think of a pipe filtering only odd numbers, and another converting incoming numbers to string.

Such a function is called an ObservablePipe.

Definition of an ObservablePipe

An ObservablePipe is a transform/modify data function for Observables. It usually subscribes to the provided Observable, performs some operations on the incoming values, and may emit these values or new ones.

Here's the type of an ObservablePipe:

interface IObservablePipe<GIn, GOut> {
(subscribe: IObservable<GIn>): IObservable<GOut>;
}

A ObservablePipe is a function that accepts an Observable as input (the subscribe argument), and returns another Observable as output. Both are related by an algorithm defined internally into the ObservablePipe.

As an example, here's a handmade ObservablePipe that emits only distinct received values:

const distinct = <GValue>(subscribe: IObservable<GValue>): IObservable<GValue> => {
return (emit: IObserver<GValue>): IUnsubscribe => {
let previousValue: GValue;
return subscribe((value: GValue): void => {
if (value !== previousValue) {
previousValue = value;
emit(value);
}
});
};
};

Then we may use our new ObservablePipe like this:

const distinctValues$ = distinct(observable);
const unsubscribe = distinctValues$(observer);

Click here to see the live demo

If this looks too complex, don't worry. We won't have to write from scratch any of them. @lirx/core comes with plenty of ObservablePipes, right out of the box.

Chaining many ObservablePipes

ObservablePipes are made to be chained together to create complex dataflow.

An ObservablePipe could be used directly like any ordinary functions (distinct(observable)), but in practice, there tend to be many of them convolved together, and quickly become unreadable (op4(op3(op2(op1(obs))))).

For that reason, we will use the function pipe$$ that accomplishes the same thing while being much easier to read:

pipe$$(obs, [
op1,
op2,
op3,
op4,
]);

This function simply does:

[op1, op2, op3, op4].reduce((value, fnc) => fnc(value), obs);

Moreover, we can group our ObservablePipes too using pipe$$$ (less frequently used).

In the next chapter, we will see some examples.