Skip to content
GitHubXDiscord

From redux-observable

redux-observable epics are RxJS pipelines over the action stream. The most common shape — “filter by type, do an async thing, emit a result action” — maps directly to a Triggery handler. Anything that combines multiple streams reactively (combineLatest, withLatestFrom) is where Triggery’s pull-only model and RxJS’s push-only model start to disagree.

redux-observable / RxJSTriggery
ofType('foo') filterevents: ['foo']
ofType('foo', 'bar')events: ['foo', 'bar']
debounceTime(ms) on the sourceactions.debounce(ms).out?.(p) for outputs; manual for input gating
throttleTime(ms)actions.throttle(ms).out?.(p)
switchMapconcurrency: 'take-latest' (default)
mergeMapconcurrency: 'take-every'
exhaustMapconcurrency: 'exhaust'
concatMapconcurrency: 'queue'
map(action => …) emit-actionactions.someAction?.(…)
withLatestFrom(state$, sel)conditions.someName registered via useReduxCondition
combineLatest([a$, b$, c$])No direct mapping — see below
takeUntil(stop$)signal.aborted checks in async handler
Before
const detailsEpic: Epic = action$ => action$.pipe(
  ofType('chat/openConversation'),
  switchMap(action =>
    from(api.fetchDetails(action.payload.id)).pipe(
      map(data => detailsLoaded(data)),
    ),
  ),
);
After
createTrigger<{
  events:  { 'chat/openConversation': { id: string } };
  actions: { storeDetails: Details };
}>({
  id: 'details-loader',
  events: ['chat/openConversation'],
  required: [],
  concurrency: 'take-latest',     // default
  async handler({ event, signal, actions }) {
    const data = await api.fetchDetails(event.payload.id, { signal });
    if (signal.aborted) return;
    actions.storeDetails?.(data);
  },
});

switchMap = take-latest. They have the same semantics: a new input cancels the previous in-flight task.

Before
const searchEpic: Epic = action$ => action$.pipe(
  ofType('search/queryChanged'),
  debounceTime(300),
  switchMap(action => /* … */),
);

Triggery does not gate inputs with debounce — the convention is to gate the output:

After
createTrigger<{
  events:  { 'search/queryChanged': string };
  actions: { runSearch: string };
}>({
  id: 'search-debounced',
  events: ['search/queryChanged'],
  required: [],
  handler({ event, actions }) {
    actions.debounce(300).runSearch?.(event.payload);
  },
});

The reactor on the other end (useAction(trigger, 'runSearch', q => /* fetch */)) receives only the last call within the window.

Before
const epic: Epic = (action$, state$) => action$.pipe(
  ofType('chat/messageReceived'),
  withLatestFrom(state$),
  filter(([_, state]) => state.settings.notifications),
  map(([action]) => showToast(action.payload)),
);
After
createTrigger<{
  events:     { 'chat/messageReceived': Message };
  conditions: { settings: Settings };
  actions:    { showToast: { title: string } };
}>({
  id: 'toast-on-msg',
  events: ['chat/messageReceived'],
  required: ['settings'],
  handler({ event, check, actions }) {
    if (!check.is('settings', s => s.notifications)) return;
    actions.showToast?.({ title: event.payload.author });
  },
});
  • combineLatest of multiple streams. Triggers respond to one event at a time and pull conditions lazily — there’s no “fire when any of these streams emits”. Either pick one as the trigger event and treat the rest as conditions, or keep that piece as an epic / RxJS pipeline and fireEvent from a subscribe.
  • Multicasting with shareReplay. RxJS’s hot/cold semantics have no analogue in Triggery; if you need them, keep RxJS.
  • Backpressure operators (bufferTime, windowToggle). Triggery’s scheduler batches per-tick microtasks, but it doesn’t implement RxJS’s backpressure toolkit.

Keep RxJS for the truly stream-heavy parts: animation pipelines, complex multi-source synchronisation, anywhere combineLatest / merge / partition are doing real work. The two libraries cohabit fine — a trigger’s handler can subscribe to a one-shot observable, or an epic can dispatch an action that Triggery happens to listen to.