Skip to main content



function defaultNotificationObserver<GValue>(
next: IObserver<GValue> | undefined,
complete?: IObserver<void> | undefined,
error?: IObserver<unknown> | undefined,
other?: IObserver<IGenericNotification> | undefined,
): IObserver<IDefaultInNotificationsUnion<GValue>>


Creates Observer splitting incoming Notifications into 4 distincts Observers:

  • next: if a next Notification is received, this Observer is called with the Notification's value.
  • complete: if a complete Notification is received, this Observer is called without any value.
  • error: if an error Notification is received, this Observer is called with the Notification's value (the error). If undefined and an error is received, this error will be thrown.
  • other: in the case of a Notification with a different name than the previous one, this Observer is called with this Notification.


HTTP request with stream of data

const request$ = pipe$$(fromFetch(``), [
fulfilled$$$((response: Response): IObservable<IFromPromiseObservableNotifications<string>> => {
if (response.ok) {
return fromReadableStream<Uint8Array>(response.body);
} else {
return throwError(createNetworkErrorFromResponse(response));

/* next */ (chunk: Uint8Array) => {
console.log('chunk', chunk);
/* complete */ () => {
console.log('done !');
/* error */ (error: any) => {
console.log('something wrong append', error);