Перейти к содержимому
GitHubXDiscord

Синхронизация по WebSocket

Чат-приложение подписано на socket.io. Каждое входящее сообщение должно разойтись веером: инвалидировать запрос разговора, дописать сообщение в кеш списка и увеличить счётчик непрочитанных — но только для каналов, которые пользователю реально нужны, и только когда он сейчас на это сообщение не смотрит. Сам сокет остаётся в одном файле. Логика маршрутизации — в одном триггере. Побочные эффекты — у тех компонентов, которым они принадлежат.

Открыть в StackBlitz Открыть пример на GitHub

Сервер socket.io эмитит события message с { channelId, message }. Нужно:

  1. Дописать сообщение в кеш списка React Query для этого канала — всегда.
  2. Инвалидировать превью канала в сайдбаре — только если канал в списке подписок пользователя.
  3. Увеличить счётчик непрочитанных — только если пользователь сейчас не смотрит этот канал.

Три независимые реакции, отсечённые тремя снимками условий, и все провязаны через один триггер.

  • Директорияsrc/
    • Директорияtriggers/
      • message.trigger.ts правило маршрутизации
    • Директорияfeatures/
      • Директорияsocket/
        • SocketBridge.tsx продьюсер (useSocketIoEvent пробрасывает в useEvent)
      • Директорияsession/
        • SubscriptionProvider.tsx провайдер (подписки на каналы)
      • Директорияchat/
        • ActiveChannel.tsx провайдер (activeChannelId)
        • ChatCacheReactor.tsx реактор (три useAction)
    • App.tsx собирает всё вместе
src/triggers/message.trigger.ts
import { createTrigger } from '@triggery/core';

type Message = {
  id: string;
  channelId: string;
  authorId: string;
  text: string;
  at: number;
};

export const messageTrigger = createTrigger<{
  events: {
    'message-received': { channelId: string; message: Message };
  };
  conditions: {
    activeChannelId:    string | null;
    subscribedChannels: ReadonlySet<string>;
    currentUserId:      string;
  };
  actions: {
    appendToList:    { channelId: string; message: Message };
    invalidatePreview: string;             // channelId
    incrementUnread: string;               // channelId
  };
}>({
  id: 'message-received',
  events: ['message-received'],
  required: ['currentUserId'],
  handler({ event, conditions, actions, check }) {
    const { channelId, message } = event.payload;

    // Кеш списка обновляется всегда — любой потребитель list-запроса
    // должен увидеть новое сообщение, даже если канал отфильтрован где-то ещё.
    actions.appendToList?.({ channelId, message });

    // Превью в сайдбаре — только для каналов, на которые пользователь подписан.
    if (check.is('subscribedChannels', set => set.has(channelId))) {
      actions.invalidatePreview?.(channelId);
    }

    // Счётчик непрочитанных — только если канал не активен и сообщение не от самого пользователя.
    const isActive = conditions.activeChannelId === channelId;
    const isOwn = message.authorId === conditions.currentUserId;
    if (!isActive && !isOwn) {
      actions.incrementUnread?.(channelId);
    }
  },
});

Обработчик читается сверху вниз как продуктовая спецификация. Добавить четвёртый побочный эффект (push-уведомление, лог в аналитику и т. п.), не трогая сокет, провайдеры и существующие реакторы, — это просто новый useAction где-то ещё в дереве.

Один компонент владеет сокетом. Через @triggery/socket он пробрасывает входящие события message в триггер. Само соединение живёт в родителе — мост лишь регистрирует слушателя.

src/features/socket/SocketBridge.tsx
import { useSocketIoEvent } from '@triggery/socket';
import type { Socket } from 'socket.io-client';
import { messageTrigger } from '../../triggers/message.trigger';

type WirePayload = { channelId: string; message: {
  id: string; channelId: string; authorId: string; text: string; at: number;
} };

export function SocketBridge({ socket }: { socket: Socket }) {
  useSocketIoEvent(messageTrigger, 'message-received', socket, 'message', {
    mapPayload: (raw: WirePayload) => raw,
  });

  return null;
}

Если транспорт — голый WebSocket, а не socket.io, замени на useWebSocketEvent:

useWebSocketEvent(messageTrigger, 'message-received', ws, 'message', {
  mapPayload: (e) => JSON.parse((e as MessageEvent).data) as WirePayload,
});

Три компонента владеют тремя кусочками состояния. Никто из них не импортирует другого.

src/features/session/SubscriptionProvider.tsx
import { useCondition } from '@triggery/react';
import { useMemo } from 'react';
import { messageTrigger } from '../../triggers/message.trigger';

export function SubscriptionProvider({
  channels,
  userId,
  children,
}: {
  channels: readonly string[];
  userId: string;
  children: React.ReactNode;
}) {
  const set = useMemo(() => new Set(channels), [channels]);

  useCondition(messageTrigger, 'subscribedChannels', () => set, [set]);
  useCondition(messageTrigger, 'currentUserId',      () => userId, [userId]);

  return <>{children}</>;
}
src/features/chat/ActiveChannel.tsx
import { useCondition } from '@triggery/react';
import { messageTrigger } from '../../triggers/message.trigger';

export function ActiveChannelProvider({
  channelId,
  children,
}: {
  channelId: string | null;
  children: React.ReactNode;
}) {
  useCondition(messageTrigger, 'activeChannelId', () => channelId, [channelId]);
  return <>{children}</>;
}

Условия читаются лениво — только когда срабатывает триггер. Перерендер <ActiveChannelProvider> при переключении канала ничего не инвалидирует; новый геттер просто вернёт новое значение при следующем вызове.

Один компонент владеет побочными эффектами. Он напрямую говорит с кешем и со стором счётчиков.

src/features/chat/ChatCacheReactor.tsx
import { useAction } from '@triggery/react';
import { useQueryClient } from '@tanstack/react-query';
import { messageTrigger } from '../../triggers/message.trigger';
import { useBadgeStore } from '../../stores/badge';

export function ChatCacheReactor() {
  const qc = useQueryClient();
  const increment = useBadgeStore(s => s.increment);

  useAction(messageTrigger, 'appendToList', ({ channelId, message }) => {
    qc.setQueryData<typeof message[]>(['messages', channelId], (prev = []) => [
      ...prev,
      message,
    ]);
  });

  useAction(messageTrigger, 'invalidatePreview', channelId => {
    qc.invalidateQueries({ queryKey: ['channel-preview', channelId] });
  });

  useAction(messageTrigger, 'incrementUnread', channelId => {
    increment(channelId);
  });

  return null;
}

<ChatCacheReactor> ничего не знает про подписки и активные каналы. Триггер уже решил, должно ли каждое действие выполниться. Реактор просто исполняет.

src/App.tsx
import { useEffect, useState } from 'react';
import { io, type Socket } from 'socket.io-client';
import { ActiveChannelProvider } from './features/chat/ActiveChannel';
import { ChatCacheReactor } from './features/chat/ChatCacheReactor';
import { SocketBridge } from './features/socket/SocketBridge';
import { SubscriptionProvider } from './features/session/SubscriptionProvider';

export function App() {
  const [socket, setSocket] = useState<Socket | null>(null);
  const [active, setActive] = useState<string | null>('c-lunch');

  useEffect(() => {
    const s = io('https://example.com', { autoConnect: true });
    setSocket(s);
    return () => { s.disconnect(); };
  }, []);

  return (
    <SubscriptionProvider userId="u-bob" channels={['c-lunch', 'c-deploys']}>
      <ActiveChannelProvider channelId={active}>
        {socket && <SocketBridge socket={socket} />}
        <ChatCacheReactor />
        {/* …chat UI… */}
      </ActiveChannelProvider>
    </SubscriptionProvider>
  );
}

Триггер — чистая функция от (event, conditions). Чтобы его протестировать, не нужны ни сокет, ни React, ни query-клиент.

src/triggers/message.trigger.test.ts
import { createTestRuntime, mockAction, mockCondition } from '@triggery/testing';
import { describe, expect, it, vi } from 'vitest';
import { messageTrigger } from './message.trigger';

const message = {
  id: 'm1',
  channelId: 'c-lunch',
  authorId: 'u-alice',
  text: 'are you free?',
  at: 0,
};

describe('message-received', () => {
  it('always appends to the list cache', async () => {
    const rt = createTestRuntime({ triggers: [messageTrigger] });
    const appendToList = vi.fn();
    mockCondition(rt, messageTrigger, 'currentUserId', 'u-bob');
    mockCondition(rt, messageTrigger, 'subscribedChannels', new Set<string>());
    mockCondition(rt, messageTrigger, 'activeChannelId', null);
    mockAction(rt, messageTrigger, 'appendToList', appendToList);

    await rt.fire('message-received', { channelId: 'c-lunch', message });

    expect(appendToList).toHaveBeenCalledWith({ channelId: 'c-lunch', message });
  });

  it('does not bump unread when the channel is active', async () => {
    const rt = createTestRuntime({ triggers: [messageTrigger] });
    const incrementUnread = vi.fn();
    mockCondition(rt, messageTrigger, 'currentUserId', 'u-bob');
    mockCondition(rt, messageTrigger, 'subscribedChannels', new Set(['c-lunch']));
    mockCondition(rt, messageTrigger, 'activeChannelId', 'c-lunch');
    mockAction(rt, messageTrigger, 'incrementUnread', incrementUnread);

    await rt.fire('message-received', { channelId: 'c-lunch', message });

    expect(incrementUnread).not.toHaveBeenCalled();
  });
});