createReactiveStoreFromDataPublisherFactory

function createReactiveStoreFromDataPublisherFactory<TData>(
    config,
): ReactiveStreamStore<TData>;

Returns a ReactiveStreamStore that wires itself to a fresh DataPublisher on every `connect()`.

The store accepts a createDataPublisher factory rather than a ready-made publisher — that lets the store tear down a broken stream and open a new one without losing subscribers or the last known value. The factory receives the per-connection signal so the underlying transport can stop on per-connection abort, not just the stream-store's listeners.

Things to note:

  • The returned store starts in status: 'idle'. Call connect() to open the first stream.
  • createDataPublisher is invoked on every connect(). The store transitions through loading, preserving the last known data and error (stale-while-revalidate).
  • If createDataPublisher rejects, the store transitions to status: 'error' with the rejection as the error. Call connect() to try again.
  • reset() aborts the current connection and returns the store to idle, clearing data and error. A follow-up connect() opens a fresh stream.
  • Attach a caller-provided cancellation source via `withSignal()`store.withSignal(signal).connect() composes the signal with the per-connection controller. Aborting the caller's signal transitions the store to error with that abort reason.

Type Parameters

Type Parameter
TData

Parameters

ParameterTypeDescription
configFactoryConfig-

Returns

ReactiveStreamStore<TData>

Example

const store = createReactiveStoreFromDataPublisherFactory({
    createDataPublisher: signal => getDataPublisherFromEventEmitter(new WebSocket(url, { signal })),
    dataChannelName: 'message',
    errorChannelName: 'error',
});
const unsubscribe = store.subscribe(() => {
    const snapshot = store.getUnifiedState();
    if (snapshot.status === 'error') console.error('Connection failed:', snapshot.error);
    else if (snapshot.status === 'loaded') console.log('Latest:', snapshot.data);
});
// Fresh 30-second clock per connection attempt:
store.withSignal(AbortSignal.timeout(30_000)).connect();

On this page