Skip to main content

IObservablePipe

Types

interface IObservablePipe<GIn, GOut> {
(subscribe: IObservable<GIn>): IObservable<GOut>;
}

Definition

An ObservablePipe receives a value, performs some operation on it, and may emit something else.

It is similar to an ObserverPipe but works with Observables instead.

Somehow, this is both a lazy push destination (returned Observable), and a lazy push source (received Observable)

This is equivalent to the Pipeable Operators (only the pipeable ones).

info

A best practice consist of post-fixing your ObservablePipes with $$. It's a convenient way to distinguish them from the other variables. Example: const oddFilter$$ = filter$$$(x => x % 2 === 1)

Example

ObservablePipe that re-emits only distinct received values

const distinct = <GValue>(subscribe: IObservable<GValue>): IObservable<GValue> => {
return (emit: IObserver<GValue>): IUnsubscribe => {
let previousValue: GValue;
return subscribe((value: GValue): void => {
if (value !== previousValue) {
previousValue = value;
emit(value);
}
});
};
}

const subscribe: IObservable<number> = distinct(of(1, 2, 2, 3));

const unsubscribe: IUnsubscribe = subscribe((value: string) => {
console.log('value:', value);
});

Output:

value: 1
value: 2
value: 3
RxJS equivalent
of(1, 2, 2, 3)
.pipe(
distinct(),
)
.subscribe(() => {
console.log('value:', value);
});