ReactiveStreamStore

type ReactiveStreamStore<T> = object;

A reactive store that holds the latest value published to a data channel and allows external systems to subscribe to changes. Compatible with useSyncExternalStore, Svelte stores, Solid's from(), and other reactive primitives that expect a { subscribe, getUnifiedState } contract.

The store starts in status: 'idle'. Call `connect()` to open the underlying stream; the store transitions through loadingloaded (or error). Subsequent connect() calls also pass through loading while preserving the last known data and error (stale-while-revalidate).

Example

// React — the unified state snapshot has stable identity per update, making it suitable as
// the second argument to `useSyncExternalStore`.
const state = useSyncExternalStore(store.subscribe, store.getUnifiedState);
useEffect(() => {
    store.connect();
    return () => store.reset();
}, [store]);
if (state.status === 'error') return <ErrorMessage error={state.error} onRetry={store.connect} />;
if (state.status === 'loading' || state.status === 'idle') return <Spinner />;
return <View data={state.data} />;

See

createReactiveStoreFromDataPublisherFactory

Type Parameters

Type Parameter
T

Methods

connect()

connect(): void;

Open the underlying stream. Aborts any currently active connection, invokes the configured factory, and transitions the store to loading (preserving the last known data and error for stale-while-revalidate) before settling into loaded (on data) or error (on failure).

Returns

void


getError()

getError(): unknown;

Returns the error published to the error channel, or undefined if no error has occurred.

Returns

unknown

Deprecated

Use `getUnifiedState()` instead. This getter returns only the error field and cannot narrow the relationship between the current value, error, and status.


getState()

getState(): T | undefined;

Returns the most recent value published to the data channel, or undefined if no notification has arrived yet. On error, continues to return the last known value.

Returns

T | undefined

Deprecated

Use `getUnifiedState()` instead. This getter returns only the value field and does not surface lifecycle status (e.g. loading).


getUnifiedState()

getUnifiedState(): ReactiveState<T>;

Returns the current lifecycle snapshot: { data, error, status }. The returned object has stable identity between state changes, making it safe to pass directly as the getSnapshot argument to React's useSyncExternalStore.

Returns

ReactiveState<T>

See

ReactiveState


reset()

reset(): void;

Aborts any currently active connection and resets the store to { status: 'idle' }. Both data and error are cleared. Use this to tear down the connection without permanently killing the store — a follow-up `connect()` will open a fresh stream.

Returns

void


retry()

retry(): void;

Re-opens the stream after an error. No-op when the store is not in status: 'error'.

Returns

void

Deprecated

Use `connect()` instead. connect() always (re)connects, regardless of current status — wrap with a status guard at the call site if you need the error-only behaviour.


subscribe()

subscribe(callback): () => void;

Registers a callback to be called whenever the state changes or an error is received. Returns an unsubscribe function. Safe to call multiple times.

Parameters

ParameterType
callback() => void

Returns

() => void


withSignal()

withSignal(signal): object;

Returns a thin wrapper exposing connect() that composes signal with the store's internal per-connection controller via AbortSignal.any — aborting either tears down the active connection. Aborting the caller-provided signal surfaces the abort reason on state as { status: 'error' }; the internal controller path (supersession by a newer connect() or reset()) is silent by design so the newer call owns state. Use this to attach a caller-provided cancellation source (per-connection timeout, shared kill switch, parent-context signal) without touching the bare connect() API.

  • Per-connection timeout: store.withSignal(AbortSignal.timeout(30_000)).connect() — fresh clock per call.
  • Permanent kill switch: hold one AbortController, bind the wrapper once (const killable = store.withSignal(killCtrl.signal)), and use killable.connect() everywhere; aborting the controller cancels the active connection and short-circuits future calls through the bound wrapper.

The wrapper exposes only connect()getUnifiedState / subscribe / reset remain store-level concerns on the parent.

Parameters

ParameterType
signalAbortSignal

Returns

object

NameType
connect()() => void

On this page