Перейти к содержимому
GitHubXDiscord

Стратегии параллелизма

У async-обработчиков есть длительность. Интересный вопрос — что произойдёт, когда тот же триггер произойдёт ещё раз, пока предыдущий прогон ещё в полёте. В Triggery пять реальных стратегий (плюс маркер sync для документации), задаваемых в конфиге триггера:

createTrigger<Schema>({
  id: 'search-query',
  events: ['query-changed'],
  concurrency: 'take-latest',         // ← здесь
  async handler(ctx) { /* … */ },
});

По умолчанию 'take-latest' — правильный выбор для поиска, автокомплита, навигации и большинства user-facing реактивных потоков. Остальные четыре существуют, потому что каждая по-своему правильна для узких повторяющихся кейсов.

СтратегияПоведение, когда приходит новое событие посреди прогонаЛучше всего подходит для
'take-latest' (по умолчанию)Прервать предыдущий прогон (signal.aborted = true, reason 'superseded-by-latest'). Новый прогон стартует сразу.Поиск / автокомплит, загрузки навигации, всё, где важен только последний ответ.
'take-every'Оба прогона идут независимо. Никакого abort, никакого skip.Аналитика, логинг, fire-and-forget побочные эффекты, которые не должны мешать друг другу.
'take-first'Пока прогон в полёте, новые события сбрасываются (записываются как skipped, reason concurrency-take-first).Идемпотентные дорогие чтения, где результат в полёте удовлетворит вызывающих.
'exhaust'На проводе идентично 'take-first': новые события сбрасываются, пока текущий не завершится.То же самое; выбирай то имя, которое лучше читается в конфиге.
'queue'Новые прогоны ждут своей очереди. Каждый стартует только после завершения предыдущего.Мутации / записи, где важен порядок: PATCH/POST/PUT к одному ресурсу.
'sync'Документационный маркер для sync-only обработчиков. На рантайме идентичен 'take-every'.Чисто синхронные обработчики; сигналит намерение читателям.

Реальные выборы, выведенные из формы сценария:

Поиск / автокомплит — take-latest
createTrigger<{
  events:     { 'query-changed': { q: string } };
  conditions: { apiBase: string };
  actions:    { showResults: readonly Hit[] };
}>({
  id: 'search-query',
  events: ['query-changed'],
  concurrency: 'take-latest',
  required: ['apiBase'],
  async handler({ event, conditions, signal, actions }) {
    const res = await fetch(`${conditions.apiBase}/search?q=${event.payload.q}`, { signal });
    signal.throwIfAborted();
    actions.showResults?.(await res.json());
  },
});
Аналитика просмотров страниц — take-every
createTrigger<{
  events:  { 'page-view': { path: string } };
  actions: { sendBeacon: { path: string; ts: number } };
}>({
  id: 'page-view-analytics',
  events: ['page-view'],
  concurrency: 'take-every',
  async handler({ event, signal, actions }) {
    await fetch(`/beacon?path=${event.payload.path}`, { signal, keepalive: true });
    actions.sendBeacon?.({ path: event.payload.path, ts: Date.now() });
  },
});
Идемпотентное дорогое чтение — take-first
createTrigger<{
  events:     { 'config-refresh-requested': void };
  conditions: { apiBase: string };
  actions:    { setConfig: AppConfig };
}>({
  id: 'config-refresh',
  events: ['config-refresh-requested'],
  concurrency: 'take-first',          // burst кликов рефреша → один round-trip
  required: ['apiBase'],
  async handler({ conditions, signal, actions }) {
    const res = await fetch(`${conditions.apiBase}/config`, { signal });
    signal.throwIfAborted();
    actions.setConfig?.(await res.json());
  },
});
Сериализованная мутация — queue
createTrigger<{
  events:     { 'note-edited': { noteId: string; body: string } };
  conditions: { apiBase: string };
  actions:    { markSaved: { noteId: string; savedAt: number } };
}>({
  id: 'note-autosave',
  events: ['note-edited'],
  concurrency: 'queue',               // каждый PATCH попадает после предыдущего
  required: ['apiBase'],
  async handler({ event, conditions, signal, actions }) {
    await fetch(`${conditions.apiBase}/notes/${event.payload.noteId}`, {
      method: 'PATCH',
      body: JSON.stringify({ body: event.payload.body }),
      signal,
    });
    signal.throwIfAborted();
    actions.markSaved?.({ noteId: event.payload.noteId, savedAt: Date.now() });
  },
});
Фоновое обновление — exhaust
createTrigger<{
  events:     { 'tick': void };
  conditions: { apiBase: string };
  actions:    { setFeed: readonly FeedItem[] };
}>({
  id: 'feed-poll',
  events: ['tick'],
  concurrency: 'exhaust',             // поллеры, что перебрали, не нагромождаются
  required: ['apiBase'],
  async handler({ conditions, signal, actions }) {
    const res = await fetch(`${conditions.apiBase}/feed`, { signal });
    signal.throwIfAborted();
    actions.setFeed?.(await res.json());
  },
});

Сквозная мысль через все пять: тело обработчика идентично. Стратегия — это одна строка конфига, никогда не код.

Per-trigger, в конфиге:

createTrigger<Schema>({
  id: 'my-trigger',
  events: ['my-event'],
  concurrency: 'queue',
  async handler(ctx) { /* … */ },
});

Глобального дефолта на весь рантайм нет — выбор локален для сценария, by design. Если хочется “все писатели в этой фиче используют queue”, это намёк, что файл триггера стоит держать в одном месте, а не заводить глобальный дефолт.

concurrency и actions.debounce / throttle / defer работают на разных уровнях. Не путай их:

МеханизмГранулярностьЭффект
concurrencyВесь прогон обработчикаКогда два события пересекаются, что рантайму делать с прогоном в полёте?
actions.debounce(800).foo()Один вызов действияЗапланировать этот единичный вызов на вызов через 800 мс после последнего вызова, заменив любой ожидающий с тем же ключом.
actions.throttle(2000).foo()Один вызов действияLeading-edge throttle: вызов сразу, игнор последующих вызовов в течение 2 с.
actions.defer(50).foo()Один вызов действияВызов ровно через 50 мс; новые вызовы его не заменяют.

Можно свободно мешать. Типичный паттерн: обработчик take-latest + actions.debounce(80).showResults, чтобы схлопнуть два соседних рендера.

См. Debounce и throttle для полного справочника прокси.

Ниже — каждая строка одна стратегия. События приходят в t=0мс, t=100мс, t=200мс. Каждый вызов обработчика занимает 250 мс.

Event:         A             B             C
t (мс):        0             100           200           500

take-latest    A───────╳     B───────╳     C─────────────►done
                       (aborted)    (aborted)

take-every     A─────────────────────►done
               B───────────────────────►done
               C───────────────────────►done

take-first     A─────────────────────►done
               B (skipped)
               C (skipped — всё ещё A в полёте в t=200)

exhaust        ≡ take-first

queue          A─────────────────────►done
                                         B─────────────►done
                                                                 C─►done

sync*          ≡ take-every (только маркер; для sync-обработчиков)

Заметки к чтению:

  • take-latest — предыдущий прогон получает signal.aborted = true в момент выпуска следующего события. Любой последующий signal.throwIfAborted() в abort’нутом обработчике короткозамкнётся; любой ожидающий fetch(..., { signal }) зареджектится AbortError.
  • queue — старт B гейтится резолвом A. Общее wall-time растёт линейно. Если нужна ограниченная concurrency выше 1, пиши кастомное решение снаружи Triggery (worker pool, p-limit и т.п.) и зови его из take-every обработчика.
  • take-every — три пересекающихся прогона, каждый владеет своим signal. Abort не происходит, пока не диспознут рантайм.

Каждый прерванный / пропущенный прогон записывается в кольцевой буфер инспектора со стабильной строкой reason. Полезно, когда сценарий “не происходит” и ты хочешь знать почему.

ReasonСмысл
'superseded-by-latest'take-latest прервал предыдущий прогон, потому что произошло более новое событие.
'concurrency-take-first'Новое событие дропнули, потому что что-то в полёте.
'concurrency-exhaust'То же самое, записано под именем exhaust.
'disposed'Рантайм, скоуп или триггер были диспознуты посреди прогона.
'hmr'Vite/webpack HMR переоценил модуль триггера; предыдущий экземпляр был диспознут.

Читай буфер откуда угодно:

import { getDefaultRuntime } from '@triggery/core';

for (const snap of getDefaultRuntime().getInspectorBuffer()) {
  if (snap.status === 'aborted') {
    console.log(snap.triggerId, snap.runId, snap.reason);
  }
}

Или в React — отрендерь список через useInspectHistory(trigger) из @triggery/react. DevTools-мост сериализует те же снепшоты через postMessage.

Распространённая ловушка: незакрытые писатели предыдущего прогона

Заголовок раздела «Распространённая ловушка: незакрытые писатели предыдущего прогона»

take-latest прерывает обработчик, а не уже задиспатченные побочные эффекты. Если предыдущий прогон уже позвал actions.show?.(...) с частичными данными до точки abort’а, это состояние уже в твоём сторе.

async handler({ event, conditions, signal, actions }) {
  const profile = await fetch(`/users/${event.payload.id}`, { signal }).then((r) => r.json());
  signal.throwIfAborted();
  actions.setProfile?.(profile);                       // ← попало в стор

  const orgs = await fetch(`/users/${event.payload.id}/orgs`, { signal }).then((r) => r.json());
  signal.throwIfAborted();                             // ← если aborted *здесь*…
  actions.setOrgs?.(orgs);
}

Если пользователь кликнул на A, потом на B, до прибытия orgs-ответа, стор окажется с профилем A и всем остальным B. Фикс зависит от того, что ты хочешь:

  • Всё-или-ничего: собирай в локальные переменные, диспатчи все действия только после последнего await. (См. Асинхронные обработчики → Последовательные await’ы.)
  • Версионирование по ключу: включай event.payload.id в каждый payload действия и пусть реакторы выкидывают устаревшие записи, если id больше не совпадает с текущим контекстом.
  • Сменить стратегию: queue делает записи последовательными. Пользователь видит, что A завершилось, прежде чем стартует B.

Правильный ответ — зависит от сценария. Неправильный — делать вид, что проблемы нет, потому что take-latest “отменил” предыдущий прогон.

Распространённая ловушка: использовать queue для дедупликации

Заголовок раздела «Распространённая ловушка: использовать queue для дедупликации»

queue не дедуплицирует. Три быстрых клика на кнопку “save” под queue производят три последовательных PATCH’а. Если нужно “максимум один ожидающий save”:

createTrigger<Schema>({
  id: 'save-note',
  events: ['save-clicked'],
  concurrency: 'take-latest',          // ← коалесит запросы в полёте
  async handler({ event, signal, actions }) {
    const body = collectFormBody();
    await fetch(`/notes/${event.payload.id}`, { method: 'PATCH', body, signal });
    signal.throwIfAborted();
    actions.markSaved?.({ id: event.payload.id, ts: Date.now() });
  },
});

Или дебаунси действие, которое запускает событие:

const fireSave = useEvent(saveTrigger, 'save-clicked');
useEffect(() => {
  const id = setTimeout(() => fireSave({ id: noteId }), 500);
  return () => clearTimeout(id);
}, [body, noteId, fireSave]);

queue — для порядка, не для throttle.