Skip to main content

toAsyncIterable

Types

function toAsyncIterable<GValue>(
subscribe: IObservable<IObservableToAsyncGeneratorNotifications<GValue>>,
): AsyncGenerator<GValue>

Definition

Converts an Observable of Notifications into an async iterable.

The Observable must emit the following Notifications:

  • next: the values to yield
  • complete: ends the async iterable (done === true)
  • error: throws into the async iterable
caution

If you prefer to iterate calling .next instead of using for await, and want to end the iteration before the AsyncIterable is done, you'll have to call .return or .throw to unsubscribe from the original Observable. See examples below.

Examples

Yield values from 0 to 3 every 500ms

// simple function returning a Promise resolved after 'timeout' ms
function sleep(timeout: number): Promise<void> {
return new Promise<void>(resolve => setTimeout(resolve, timeout));
}

// creates an observables from an AsyncIterable which sends values from 0 to 3 every 500ms
const values$ = fromAsyncIterable((async function * () {
for (let i = 0; i < 4; i++) {
await sleep(500);
console.log('emit', i);
yield i;
}
})());

// transforms this Observable into an AsyncIterable
const iterable = toAsyncIterable(values$);
// iterates using <for await>
for await (const value of iterable) {
console.log('receive', value);
}

Output:

emit 0
receive 0
emit 1
receive 1
emit 2
receive 2
emit 3
receive 3

Yield values from 0 to Infinity every 500ms, and break on the second iteration

// creates an observables from an AsyncIterable which sends values from 0 to Infinity every 500ms
const values$ = fromAsyncIterable((async function * () {
let i = 0;
while (true) {
await sleep(500);
console.log('emit', i);
yield i;
i++;
}
})());
let i = 0;
for await (const value of iterable) {
console.log('receive', value);
if (++i >= 2) {
break;
}
}

Output:

emit 0
receive 0
emit 1
receive 1
// unsubscribe automatically
emit 2

caution

If you prefer to use .next instead of for await, you'll have to call .return or .throw:

let i = 0;
let result: IteratorResult<number>;
while (!(result = await iterable.next()).done) {
console.log('receive', result.value);
if (++i >= 2) {
// WARN: it's important to .return ot .throw the iterable to free resources from the original Observable
await iterable.return(void 0);
break;
}
}

Output:

emit 0
receive 0
emit 1
receive 1
// unsubscribe automatically
emit 2

If you don't, the original Observable will continue to send its values:

emit 0
receive 0
emit 1
receive 1
// not unsubscribed
emit 2
emit 3
emit 4
...

And it will create a memory leak.