Skip to main content

Notification

As you may have noticed so far, the Observables don't have any state like complete or error. They are just raw streams of data.

However, some streams have a final state, and we need a way to represent them with Observables.

For example, an HTTP request finishes with a success state (and an associated Response's Body) or an error state (with an associated error) if it failed.

@lirx/core uses Notifications as a state replacement for Observables.

Definition of a Notification

interface INotification<GName extends string, GValue> {
readonly name: GName;
readonly value: GValue;
}

A Notification is simply a tuple composed of a name and a value.

You may create one using a plain object or prefer the function createNotification.

The three essentials Notifications are:

These notifications will be passed as values for our Observables.

An example, is worth a thousand words.

Example

Let's create an Observable based on a Promise:

  • If the Promise fulfills, we will send a next Notification containing the Promise's result, followed by a complete one.
  • If the Promise rejects, we will send an error Notification containing the Promise's error.

We will start by defining the type of value that our Observable will emit:

type IObservableFromPromiseNotifications<GValue> =
| INextNotification<GValue>
| ICompleteNotification
| IErrorNotification
;

It's just a union of different kind of notifications.

Then, we will handle the Promise's result and emit the corresponding Notifications:

promise.then(
(value: GValue) => {
emit(createNextNotification<GValue>(value));
emit(createCompleteNotification());
},
(error: any) => {
emit(createErrorNotification<any>(error));
},
);

Let's wrap everything in a function:

function fromPromise<GValue>(
promise: Promise<GValue>,
): IObservable<IObservableFromPromiseNotifications<GValue>> {
type GNotificationsUnion = IObservableFromPromiseNotifications<GValue>;
return (emit: IObserver<GNotificationsUnion>): IUnsubscribe => {
let running: boolean = true;
promise.then(
(value: GValue) => {
if (running) {
emit(createNextNotification<GValue>(value));
}
if (running) {
emit(createCompleteNotification());
}
},
(error: any) => {
if (running) {
emit(createErrorNotification<any>(error));
}
},
);
return (): void => {
running = false;
};
};
}

And voilà ! We've created an Observable from a Promise 🎊.

info

You may import and use immediately fromPromise and fromPromiseFactory from @lirx/core, instead of re-implementing it.


Now we have to consume our Observable:

const subscribe = fromPromise(Promise.resolve(5));

subscribe((notification: IObservableFromPromiseNotifications<number>) => {
switch (notification.name) {
case 'next':
console.log('next', notification.value);
break;
case 'complete':
console.log('resolved');
break;
case 'error':
console.log('rejected', notification.value);
break;
}
});

We simply switch on the incoming Notification's name.

We may also use the function defaultNotificationObserver if we prefer a shorter syntax:

subscribe(
defaultNotificationObserver(
/* next */ (value: number) => {
console.log('next', value);
},
/* complete */ () => {
console.log('resolved');
},
/* error */ (error: any) => {
console.log('rejected', error);
},
),
);

Observables based on Notifications

There are plenty of Observables and ObservablePipes based on Notifications on the reference page.

For example, we could create an Observable from an HTTP request, and then display the result:

const request$ = pipe$$(fromFetch(`https://www.w3.org/TR/PNG/iso_8859-1.txt`), [
fulfilled$$$((response: Response): IObservable<IFromPromiseObservableNotifications<string>> => {
if (response.ok) {
return fromPromise(response.text());
} else {
return throwError(createNetworkError());
}
}),
]);

toPromiseLast(request$)
.then((text: string) => {
console.log(text);
});

😰 Yes, in this case, using Promises is easier and shorter.

However, with Observables we may create far more powerful data flow - like streaming the response body:

const request$ = pipe$$(fromFetch(`https://www.w3.org/TR/PNG/iso_8859-1.txt`), [
fulfilled$$$((response: Response): IObservable<IFromPromiseObservableNotifications<string>> => {
if (response.ok) {
return fromReadableStream<Uint8Array>(response.body);
} else {
return throwError(createNetworkErrorFromResponse(response));
}
}),
]);
request$(
defaultNotificationObserver(
/* next */ (chunk: Uint8Array) => {
console.log('chunk', chunk);
},
/* complete */ () => {
console.log('done !');
},
/* error */ (error: any) => {
console.log('something wrong append', error);
},
),
);

It's something that can't be done with Promises, but can be done very easily with Observables.

Observables are extremely powerful. When you'll master them, we guaranty you'll do some magic.

Comparison with RxJS

Why @lirx/core's Observables are not based on 'next', 'complete' and 'error' events likes RxJS ?

This choice is due to a simple observation: most Observables don't have or need a final state.

They just send raw values, without ever finishing until unsubscribed (ex: fromEventTarget, interval, etc.).

So complete or error states are unnecessary in the majority of use cases.

However, the RxJS's Observables are build exclusively on these three "events". It results in tortuous interactions of the data in the pipes, elaborated state management, and complex algorithms. All of these impacts negatively RxJS: it means more lines of code in the library, resulting in a bigger bundle size and decreased performances.

Finally, the Observables should not be restricted to these 3 "events": we may easily imagine an XHR Observable emitting upload-progress and download-progress events. Something impossible with RxJS.

These use cases are perfectly handled with @lirx/core, with even faster performances. Thanks to the Notifications' approach.