Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions goldens/public-api/core/rxjs-interop/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,29 @@ export interface ToObservableOptions {
export function toSignal<T>(source: Observable<T>): Signal<T | undefined>;

// @public
export function toSignal<T, U extends T | null | undefined>(source: Observable<T>, options: {
export function toSignal<T>(source: Observable<T>, options?: ToSignalOptions<undefined> & {
requireSync?: false;
}): Signal<T | undefined>;

// @public
export function toSignal<T, U extends T | null | undefined>(source: Observable<T>, options: ToSignalOptions<U> & {
initialValue: U;
requireSync?: false;
}): Signal<T | U>;

// @public (undocumented)
export function toSignal<T>(source: Observable<T>, options: {
// @public
export function toSignal<T>(source: Observable<T>, options: ToSignalOptions<undefined> & {
requireSync: true;
}): Signal<T>;

// @public
export interface ToSignalOptions<T> {
initialValue?: T;
injector?: Injector;
manualCleanup?: boolean;
requireSync?: boolean;
}

// (No @packageDocumentation comment for this package)

```
2 changes: 1 addition & 1 deletion packages/core/rxjs-interop/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

export {takeUntilDestroyed} from './take_until_destroyed';
export {toObservable, ToObservableOptions} from './to_observable';
export {toSignal} from './to_signal';
export {toSignal, ToSignalOptions} from './to_signal';
4 changes: 2 additions & 2 deletions packages/core/rxjs-interop/src/to_observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {Observable, ReplaySubject} from 'rxjs';
*/
export interface ToObservableOptions {
/**
* The `Injector` to use when creating the effect.
* The `Injector` to use when creating the underlying `effect` which watches the signal.
*
* If this isn't specified, the current injection context will be used.
*/
Expand All @@ -28,7 +28,7 @@ export interface ToObservableOptions {
*
* The signal's value will be propagated into the `Observable`'s subscribers using an `effect`.
*
* `toObservable` must be called in an injection context.
* `toObservable` must be called in an injection context unless an injector is provided via options.
*
* @developerPreview
*/
Expand Down
147 changes: 122 additions & 25 deletions packages/core/rxjs-interop/src/to_signal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,53 @@
* found in the LICENSE file at https://angular.io/license
*/

import {assertInInjectionContext, computed, DestroyRef, inject, signal, Signal, WritableSignal} from '@angular/core';
import {assertInInjectionContext, computed, DestroyRef, inject, Injector, signal, Signal, WritableSignal} from '@angular/core';
import {Observable} from 'rxjs';

import {RuntimeError, RuntimeErrorCode} from '../../src/errors';
import {untracked} from '../../src/signals';

/**
* Options for `toSignal`.
*
* @publicApi
*/
export interface ToSignalOptions<T> {
/**
* Initial value for the signal produced by `toSignal`.
*
* This will be the value of the signal until the observable emits its first value.
*/
initialValue?: T;

/**
* Whether to require that the observable emits synchronously when `toSignal` subscribes.
*
* If this is `true`, `toSignal` will assert that the observable produces a value immediately upon
* subscription. Setting this option removes the need to either deal with `undefined` in the
* signal type or provide an `initialValue`, at the cost of a runtime error if this requirement is
* not met.
*/
requireSync?: boolean;

/**
* `Injector` which will provide the `DestroyRef` used to clean up the Observable subscription.
*
* If this is not provided, a `DestroyRef` will be retrieved from the current injection context,
* unless manual cleanup is requested.
*/
injector?: Injector;

/**
* Whether the subscription should be automatically cleaned up (via `DestroyRef`) when
* `toObservable`'s creation context is destroyed.
*
* If manual cleanup is enabled, then `DestroyRef` is not used, and the subscription will persist
* until the `Observable` itself completes.
*/
manualCleanup?: boolean;
}

/**
* Get the current value of an `Observable` as a reactive `Signal`.
*
Expand All @@ -20,16 +61,17 @@ import {untracked} from '../../src/signals';
* have the most recent value emitted by the subscription, and will throw an error if the
* `Observable` errors.
*
* The subscription will last for the lifetime of the current injection context. That is, if
* `toSignal` is called from a component context, the subscription will be cleaned up when the
* component is destroyed. When called outside of a component, the current `EnvironmentInjector`'s
* lifetime will be used (which is typically the lifetime of the application itself).
* Before the `Observable` emits its first value, the `Signal` will return `undefined`. To avoid
* this, either an `initialValue` can be passed or the `requireSync` option enabled.
*
* If the `Observable` does not produce a value before the `Signal` is read, the `Signal` will throw
* an error. To avoid this, use a synchronous `Observable` (potentially created with the `startWith`
* operator) or pass an initial value to `toSignal` as the second argument.
* By default, the subscription will be automatically cleaned up when the current injection context
* is destroyed. For example, when `toObservable` is called during the construction of a component,
* the subscription will be cleaned up when the component is destroyed. If an injection context is
* not available, an explicit `Injector` can be passed instead.
*
* `toSignal` must be called in an injection context.
* If the subscription should persist until the `Observable` itself completes, the `manualCleanup`
* option can be specified instead, which disables the automatic subscription teardown. No injection
* context is needed in this configuration as well.
*/
export function toSignal<T>(source: Observable<T>): Signal<T|undefined>;

Expand All @@ -41,29 +83,84 @@ export function toSignal<T>(source: Observable<T>): Signal<T|undefined>;
* have the most recent value emitted by the subscription, and will throw an error if the
* `Observable` errors.
*
* The subscription will last for the lifetime of the current injection context. That is, if
* `toSignal` is called from a component context, the subscription will be cleaned up when the
* component is destroyed. When called outside of a component, the current `EnvironmentInjector`'s
* lifetime will be used (which is typically the lifetime of the application itself).
* Before the `Observable` emits its first value, the `Signal` will return the configured
* `initialValue`, or `undefined` if no `initialValue` is provided. If the `Observable` is
* guaranteed to emit synchronously, then the `requireSync` option can be passed instead.
*
* By default, the subscription will be automatically cleaned up when the current injection context
* is destroyed. For example, when `toObservable` is called during the construction of a component,
* the subscription will be cleaned up when the component is destroyed. If an injection context is
* not available, an explicit `Injector` can be passed instead.
*
* If the subscription should persist until the `Observable` itself completes, the `manualCleanup`
* option can be specified instead, which disables the automatic subscription teardown. No injection
* context is needed in this configuration as well.
*
* @developerPreview
*/
export function toSignal<T>(
source: Observable<T>,
options?: ToSignalOptions<undefined>&{requireSync?: false}): Signal<T|undefined>;


/**
* Get the current value of an `Observable` as a reactive `Signal`.
*
* `toSignal` returns a `Signal` which provides synchronous reactive access to values produced
* by the given `Observable`, by subscribing to that `Observable`. The returned `Signal` will always
* have the most recent value emitted by the subscription, and will throw an error if the
* `Observable` errors.
*
* Before the `Observable` emits its first value, the `Signal` will return the configured
* `initialValue`. If the `Observable` is known to produce a value before the `Signal` will be read,
* `initialValue` does not need to be passed.
* `initialValue`. If the `Observable` is guaranteed to emit synchronously, then the `requireSync`
* option can be passed instead.
*
* By default, the subscription will be automatically cleaned up when the current injection context
* is destroyed. For example, when `toObservable` is called during the construction of a component,
* the subscription will be cleaned up when the component is destroyed. If an injection context is
* not available, an explicit `Injector` can be passed instead.
*
* `toSignal` must be called in an injection context.
* If the subscription should persist until the `Observable` itself completes, the `manualCleanup`
* option can be specified instead, which disables the automatic subscription teardown. No injection
* context is needed in this configuration as well.
*
* @developerPreview
*/
export function toSignal<T, U extends T|null|undefined>(
// toSignal(Observable<Animal>, {initialValue: null}) -> Signal<Animal|null>
source: Observable<T>, options: {initialValue: U, requireSync?: false}): Signal<T|U>;
source: Observable<T>,
options: ToSignalOptions<U>&{initialValue: U, requireSync?: false}): Signal<T|U>;

/**
* Get the current value of an `Observable` as a reactive `Signal`.
*
* `toSignal` returns a `Signal` which provides synchronous reactive access to values produced
* by the given `Observable`, by subscribing to that `Observable`. The returned `Signal` will always
* have the most recent value emitted by the subscription, and will throw an error if the
* `Observable` errors.
*
* With `requireSync` set to `true`, `toSignal` will assert that the `Observable` produces a value
* immediately upon subscription. No `initialValue` is needed in this case, and the returned signal
* does not include an `undefined` type.
*
* By default, the subscription will be automatically cleaned up when the current injection context
* is destroyed. For example, when `toObservable` is called during the construction of a component,
* the subscription will be cleaned up when the component is destroyed. If an injection context is
* not available, an explicit `Injector` can be passed instead.
*
* If the subscription should persist until the `Observable` itself completes, the `manualCleanup`
* option can be specified instead, which disables the automatic subscription teardown. No injection
* context is needed in this configuration as well.
*
* @developerPreview
*/
export function toSignal<T>(
// toSignal(Observable<Animal>, {requireSync: true}) -> Signal<Animal>
source: Observable<T>, options: {requireSync: true}): Signal<T>;
// toSignal(Observable<Animal>) -> Signal<Animal|undefined>
source: Observable<T>, options: ToSignalOptions<undefined>&{requireSync: true}): Signal<T>;
export function toSignal<T, U = undefined>(
source: Observable<T>, options?: {initialValue?: U, requireSync?: boolean}): Signal<T|U> {
assertInInjectionContext(toSignal);
source: Observable<T>, options?: ToSignalOptions<U>): Signal<T|U> {
const requiresCleanup = !options?.manualCleanup;
requiresCleanup && !options?.injector && assertInInjectionContext(toSignal);
const cleanupRef =
requiresCleanup ? options?.injector?.get(DestroyRef) ?? inject(DestroyRef) : null;

// Note: T is the Observable value type, and U is the initial value type. They don't have to be
// the same - the returned signal gives values of type `T`.
Expand All @@ -89,8 +186,8 @@ export function toSignal<T, U = undefined>(
'`toSignal()` called with `requireSync` but `Observable` did not emit synchronously.');
}

// Unsubscribe when the current context is destroyed.
inject(DestroyRef).onDestroy(sub.unsubscribe.bind(sub));
// Unsubscribe when the current context is destroyed, if requested.
cleanupRef?.onDestroy(sub.unsubscribe.bind(sub));

// The actual returned signal is a `computed` of the `State` signal, which maps the various states
// to either values or errors.
Expand Down
48 changes: 47 additions & 1 deletion packages/core/rxjs-interop/test/to_signal_spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* found in the LICENSE file at https://angular.io/license
*/

import {EnvironmentInjector, Injector, runInInjectionContext} from '@angular/core';
import {DestroyRef, EnvironmentInjector, Injector, runInInjectionContext} from '@angular/core';
import {toSignal} from '@angular/core/rxjs-interop';
import {BehaviorSubject, ReplaySubject, Subject} from 'rxjs';

Expand Down Expand Up @@ -63,6 +63,52 @@ describe('toSignal()', () => {
expect(counter()).toBe(1);
}));



it('should unsubscribe when an explicitly provided injector is destroyed', test(() => {
const counter$ = new BehaviorSubject(0);
const injector = Injector.create({providers: []}) as EnvironmentInjector;
const counter = toSignal(counter$, {injector});

expect(counter()).toBe(0);
counter$.next(1);
expect(counter()).toBe(1);

// Destroying the injector should unsubscribe the Observable.
injector.destroy();

// The signal should have the last value observed.
expect(counter()).toBe(1);

// And this value should no longer be updating (unsubscribed).
counter$.next(2);
expect(counter()).toBe(1);
}));

it('should not require an injection context when manualCleanup is passed', () => {
const counter$ = new BehaviorSubject(0);
expect(() => toSignal(counter$, {manualCleanup: true})).not.toThrow();
counter$.complete();
});

it('should not unsubscribe when manualCleanup is passed', () => {
const counter$ = new BehaviorSubject(0);
const injector = Injector.create({providers: []}) as EnvironmentInjector;
const counter =
runInInjectionContext(injector, () => toSignal(counter$, {manualCleanup: true}));

injector.destroy();

// Destroying the injector should not have unsubscribed the Observable.
counter$.next(1);
expect(counter()).toBe(1);

counter$.complete();

// The signal should have the last value observed before the observable completed.
expect(counter()).toBe(1);
});

describe('with no initial value', () => {
it('should return `undefined` if read before a value is emitted', test(() => {
const counter$ = new Subject<number>();
Expand Down