createReactiveStoreWithInitialValueAndSlotTracking

function createReactiveStoreWithInitialValueAndSlotTracking<
    TRpcValue,
    TSubscriptionValue,
    TItem,
>(
    config,
): ReactiveStreamStore<
    Readonly<{
        context: Readonly<{
            slot: Slot;
        }>;
        value: TItem;
    }>
>;

Creates a ReactiveStreamStore that combines an initial RPC fetch with an ongoing subscription to keep its state up to date.

The store uses slot-based comparison to ensure that only the most recent value is kept, regardless of whether it came from the initial RPC response or a subscription notification. This prevents stale data from overwriting newer data when the RPC response and subscription notifications arrive out of order.

Things to note:

  • The returned store starts in status: 'idle'. Call ReactiveStreamStore.connect | `connect()` to fire the RPC request and open the subscription.
  • The store transitions through loading until the first response or notification arrives, then to loaded with a SolanaRpcResponse containing the value and the slot context at which it was observed.
  • On error from either source, the store transitions to status: 'error' preserving the last known value. Only the first error per connection window is captured.
  • A subsequent connect() aborts the current connection, transitions back to status: 'loading' (preserving the last known data and error for stale-while-revalidate), and re-fires the RPC request and subscription with a fresh inner abort signal.
  • ReactiveStreamStore.reset | `reset()` aborts the current connection and returns the store to idle, clearing data and error.
  • Attach a caller-provided cancellation source via ReactiveStreamStore.withSignal | `withSignal()` — store.withSignal(signal).connect() composes the signal with the per-connection controller. Aborting the caller's signal transitions the store to error with that abort reason.

Type Parameters

Type Parameter
TRpcValue
TSubscriptionValue
TItem

Parameters

ParameterTypeDescription
configCreateReactiveStoreWithInitialValueAndSlotTrackingConfig<TRpcValue, TSubscriptionValue, TItem>-

Returns

ReactiveStreamStore<Readonly<{ context: Readonly<{ slot: Slot; }>; value: TItem; }>>

Example

import {
    address,
    createReactiveStoreWithInitialValueAndSlotTracking,
    createSolanaRpc,
    createSolanaRpcSubscriptions,
} from '@solana/kit';
 
const rpc = createSolanaRpc('http://127.0.0.1:8899');
const rpcSubscriptions = createSolanaRpcSubscriptions('ws://127.0.0.1:8900');
const myAddress = address('FnHyam9w4NZoWR6mKN1CuGBritdsEWZQa4Z4oawLZGxa');
 
const balanceStore = createReactiveStoreWithInitialValueAndSlotTracking({
    rpcRequest: rpc.getBalance(myAddress, { commitment: 'confirmed' }),
    rpcValueMapper: lamports => lamports,
    rpcSubscriptionRequest: rpcSubscriptions.accountNotifications(myAddress),
    rpcSubscriptionValueMapper: ({ lamports }) => lamports,
});
 
const unsubscribe = balanceStore.subscribe(() => {
    const state = balanceStore.getUnifiedState();
    if (state.status === 'error') {
        console.error('Error:', state.error);
        balanceStore.connect();
    } else if (state.status === 'loaded') {
        console.log(`Balance at slot ${state.data.context.slot}:`, state.data.value);
    }
});
balanceStore.withSignal(AbortSignal.timeout(60_000)).connect();

See

ReactiveStreamStore

On this page