ReactiveStreamStore
A reactive store that holds the latest value published to a data channel and allows external
systems to subscribe to changes. Compatible with useSyncExternalStore, Svelte stores, Solid's
from(), and other reactive primitives that expect a { subscribe, getUnifiedState } contract.
The store starts in status: 'idle'. Call `connect()`
to open the underlying stream; the store transitions through loading → loaded (or error).
Subsequent connect() calls also pass through loading while preserving the last known
data and error (stale-while-revalidate).
Example
See
createReactiveStoreFromDataPublisherFactory
Type Parameters
| Type Parameter |
|---|
T |
Methods
connect()
Open the underlying stream. Aborts any currently active connection, invokes the configured
factory, and transitions the store to loading (preserving the last known data and
error for stale-while-revalidate) before settling into loaded (on data) or error
(on failure).
Returns
void
getError()
Returns the error published to the error channel, or undefined if no error has occurred.
Returns
unknown
Deprecated
Use `getUnifiedState()` instead. This getter returns only the error field and cannot narrow the relationship between the current value, error, and status.
getState()
Returns the most recent value published to the data channel, or undefined if no
notification has arrived yet. On error, continues to return the last known value.
Returns
T | undefined
Deprecated
Use `getUnifiedState()` instead. This
getter returns only the value field and does not surface lifecycle status (e.g. loading).
getUnifiedState()
Returns the current lifecycle snapshot: { data, error, status }. The returned object has
stable identity between state changes, making it safe to pass directly as the
getSnapshot argument to React's useSyncExternalStore.
Returns
See
reset()
Aborts any currently active connection and resets the store to { status: 'idle' }. Both
data and error are cleared. Use this to tear down the connection without permanently
killing the store — a follow-up `connect()` will open
a fresh stream.
Returns
void
retry()
Re-opens the stream after an error. No-op when the store is not in status: 'error'.
Returns
void
Deprecated
Use `connect()` instead. connect()
always (re)connects, regardless of current status — wrap with a status guard at the call
site if you need the error-only behaviour.
subscribe()
Registers a callback to be called whenever the state changes or an error is received. Returns an unsubscribe function. Safe to call multiple times.
Parameters
| Parameter | Type |
|---|---|
callback | () => void |
Returns
() => void
withSignal()
Returns a thin wrapper exposing connect() that composes signal with the store's internal
per-connection controller via AbortSignal.any — aborting either tears down the active
connection. Aborting the caller-provided signal surfaces the abort reason on state as
{ status: 'error' }; the internal controller path (supersession by a newer connect() or
reset()) is silent by design so the newer call owns state. Use this to attach a
caller-provided cancellation source (per-connection timeout, shared kill switch,
parent-context signal) without touching the bare connect() API.
- Per-connection timeout:
store.withSignal(AbortSignal.timeout(30_000)).connect()— fresh clock per call. - Permanent kill switch: hold one
AbortController, bind the wrapper once (const killable = store.withSignal(killCtrl.signal)), and usekillable.connect()everywhere; aborting the controller cancels the active connection and short-circuits future calls through the bound wrapper.
The wrapper exposes only connect() — getUnifiedState / subscribe / reset remain
store-level concerns on the parent.
Parameters
| Parameter | Type |
|---|---|
signal | AbortSignal |
Returns
object
| Name | Type |
|---|---|
connect() | () => void |