toAsyncIterable
Types
function toAsyncIterable<GValue>(
subscribe: IObservable<IObservableToAsyncGeneratorNotifications<GValue>>,
): AsyncGenerator<GValue>
Definition
Converts an Observable of Notifications into an async iterable.
The Observable must emit the following Notifications:
next
: the values to yieldcomplete
: ends the async iterable (done === true
)error
: throws into the async iterable
caution
If you prefer to iterate calling .next
instead of using for await
,
and want to end the iteration before the AsyncIterable is done
,
you'll have to call .return
or .throw
to unsubscribe from the original Observable.
See examples below.
Examples
Yield values from 0 to 3 every 500ms
// simple function returning a Promise resolved after 'timeout' ms
function sleep(timeout: number): Promise<void> {
return new Promise<void>(resolve => setTimeout(resolve, timeout));
}
// creates an observables from an AsyncIterable which sends values from 0 to 3 every 500ms
const values$ = fromAsyncIterable((async function * () {
for (let i = 0; i < 4; i++) {
await sleep(500);
console.log('emit', i);
yield i;
}
})());
// transforms this Observable into an AsyncIterable
const iterable = toAsyncIterable(values$);
// iterates using <for await>
for await (const value of iterable) {
console.log('receive', value);
}
Output:
emit 0
receive 0
emit 1
receive 1
emit 2
receive 2
emit 3
receive 3
Yield values from 0 to Infinity every 500ms, and break on the second iteration
// creates an observables from an AsyncIterable which sends values from 0 to Infinity every 500ms
const values$ = fromAsyncIterable((async function * () {
let i = 0;
while (true) {
await sleep(500);
console.log('emit', i);
yield i;
i++;
}
})());
let i = 0;
for await (const value of iterable) {
console.log('receive', value);
if (++i >= 2) {
break;
}
}
Output:
emit 0
receive 0
emit 1
receive 1
// unsubscribe automatically
emit 2
caution
If you prefer to use .next
instead of for await
, you'll have to call .return
or .throw
:
let i = 0;
let result: IteratorResult<number>;
while (!(result = await iterable.next()).done) {
console.log('receive', result.value);
if (++i >= 2) {
// WARN: it's important to .return ot .throw the iterable to free resources from the original Observable
await iterable.return(void 0);
break;
}
}
Output:
emit 0
receive 0
emit 1
receive 1
// unsubscribe automatically
emit 2
If you don't, the original Observable will continue to send its values:
emit 0
receive 0
emit 1
receive 1
// not unsubscribed
emit 2
emit 3
emit 4
...
And it will create a memory leak.