obsidian_sse_pulls
noteChanges: Live-подписка на изменения заметок
Цель
Единая GraphQL подписка noteChanges для трёх потребителей:
- Obsidian plugin — auto-pull файлов с сервера при изменениях агентами
- Админка — live-обновление страниц (админ видит изменения онлайн)
- Публичный сайт — плашка "Контент обновился, хотите обновить страницу?"
Текущее состояние
Как сейчас работает sync
- Ручной sync: клик по иконке → classify → pull/push/conflict
- Polling: каждые 60 сек
checkForPendingChanges()обновляет badge - Focus check: при фокусе окна — тоже проверка
- Push: пользователь подтверждает отправку
- Pull: только при ручном sync (если
twoWaySync: true)
SSE инфраструктура (уже есть)
- Backend: gqlgen SSE transport (
transport.SSE{}вhandler.go) - Единственная подписка:
currentTime(format: String)— тестовая - Caddy: SSE запросы обходят timeout/compress handlers
- Клиент (фронтенд):
$trip2g_sse_hostвassets/ui/sse/sse.ts— авто-реконнект, парсинг событий $trip2g_graphql_raw_subscriptionвassets/ui/graphql/index.ts— реализован и работает
Система вебхуков (переиспользуем)
handlenotewebhooks.Resolve()— вызывается изcommitNotesиhideNoteswebhookutil.MatchesAny(path, patterns)— glob matching черезdoublestarNoteChange { PathID, Path, Event }— структура события (create/update/remove)- Фильтрация:
includePatterns+excludePatterns+ event type
Архитектура решения
┌─────────────────┐
┌─────────────┐ SSE noteChanges(input) │ │
│ Obsidian │ ◄──────────────────────── │ │
│ Plugin │ auto-pull files │ │
└─────────────┘ │ trip2g Server │
│ │
┌─────────────┐ SSE noteChanges(input) │ commitNotes ─┐ │
│ Админка │ ◄──────────────────────── │ hideNotes ──┤ │
│ (mol) │ live reload данных │ ▼ │
└─────────────┘ │ notebus│
│ │ │
┌─────────────┐ SSE noteChanges(input) │ ▼ │
│ Публичный │ ◄──────────────────────── │ SSE fan-out
│ сайт │ плашка "обновить" │ │
└─────────────┘ └─────────────────┘
Поток данных (общий)
1. Агент/пользователь → pushNotes + commitNotes
2. commitNotes → IncrementSyncVersion (в той же транзакции что и save)
3. commitNotes → HandleLatestNotesAfterSave → triggerWebhooks
4. → notebus.Publish (NEW)
5. notebus → фильтрация по patterns каждого подписчика → SSE event
6. Потребитель получает {paths, event, hashes, version}
Eventual consistency: между записью контента и Publish — окно ~миллисекунды (последовательные вызовы в одном goroutine). Версия уже в базе к моменту Publish.
Поток: Obsidian plugin
6. SSE → Plugin получает событие с paths, hashes, event, version
7. Classify по хэшам из события (без доп. запроса)
8. Safe pull → скачать и записать
9. Conflict → Notice + badge, ждать ручного sync
Одно событие на один вызов Publish — paths батчатся. Debounce не нужен.
Поток: Админка
6. SSE → mol-компонент получает событие
7. Инвалидировать кэш данных для изменённых paths
8. UI обновляется реактивно (mol wire)
Поток: Публичный сайт
6. SSE → JS на странице получает событие
7. Если текущая страница в списке paths → показать плашку
8. Пользователь нажимает "Обновить" → перезагрузка / fetch нового контента
Backend: GraphQL подписка
Схема
input NoteChangesInput {
"""Glob patterns для фильтрации (doublestar). Пустой = все."""
includePatterns: [String!]
"""Glob patterns для исключения."""
excludePatterns: [String!]
}
enum NoteChangeEventType {
CREATE
UPDATE
REMOVE
}
type NoteChangeEvent {
"""Пути изменённых файлов."""
paths: [String!]!
"""Хэши для каждого path (в том же порядке). Пустой массив для remove."""
hashes: [String!]!
"""Тип события."""
event: NoteChangeEventType!
"""Глобальная версия сайта после этого изменения."""
version: Int64!
}
type Subscription {
currentTime(format: String = "2006-01-02 15:04:05"): String!
"""
X-Api-Key header must be set.
Подписка на изменения заметок. Фильтрация по glob patterns (как в webhooks).
"""
noteChanges(input: NoteChangesInput): NoteChangeEvent!
}
Pub/Sub механизм
In-process pub/sub на каналах (~50 строк). Библиотеки не нужны:
kelindar/event(552 stars) — 4-10x быстрее каналов, но для миллионов событий/сек. У нас десятки.asaskevich/EventBus(2.8k stars) — callback-based, а gqlgen ожидает каналы.- Кастомное решение идеально ложится на gqlgen (resolver возвращает
<-chan T).
Структура пакетов:
internal/model/notebus.go # NoteBusEvent struct (данные)
internal/notebus/notebus.go # Bus, Subscriber, Subscribe/Unsubscribe/Publish (логика)
Тип события в internal/model:
// internal/model/notebus.go
// NoteBusEvent описывает изменение заметок для real-time подписок.
type NoteBusEvent struct {
Paths []string // Изменённые пути.
Hashes []string // Хэши (в том же порядке). Пустой для remove.
Event string // "CREATE", "UPDATE", "REMOVE".
Version int64 // Глобальная версия сайта после изменения.
}
Bus в internal/notebus:
// internal/notebus/notebus.go
type Env interface {
Logger() logger.Logger
}
type Subscriber struct {
Ch <-chan model.NoteBusEvent // read-only для потребителя.
ch chan model.NoteBusEvent // write для Bus.
includePatterns []string
excludePatterns []string
}
type Stats struct {
Subscribers int // Текущее количество подписчиков.
Published int64 // Всего опубликовано событий.
Dropped int64 // Потеряно из-за slow consumers.
}
type Bus struct {
mu sync.RWMutex
subs map[*Subscriber]struct{}
env Env
stats Stats
}
func New(env Env) *Bus { ... }
func (b *Bus) Subscribe(include, exclude []string) *Subscriber { ... }
func (b *Bus) Unsubscribe(s *Subscriber) { ... }
func (b *Bus) Publish(event model.NoteBusEvent) { ... }
func (b *Bus) GetStats() Stats { ... }
Publish() фильтрует по webhookutil.MatchesAny() для каждого подписчика. Buffered channel (size 64). select с default для slow consumers (drop + логирование через env.Logger()).
Тесты notebus:
- Subscribe → Publish → получили событие
- Subscribe с patterns → Publish с non-matching path → НЕ получили
- Unsubscribe → Publish → канал закрыт, событие не пришло
- 3 подписчика, Publish → все получили (fan-out)
- Буфер заполнен → событие dropped, stats.Dropped инкрементирован
Интеграция в app (cmd/server/main.go):
type app struct {
// ...existing...
noteBus *notebus.Bus
}
// Метод для case-пакетов — через Env interface.
func (a *app) PublishNoteEvent(event model.NoteBusEvent) {
a.noteBus.Publish(event)
}
// Метод для subscription resolver.
func (a *app) SubscribeNoteChanges(include, exclude []string) *notebus.Subscriber {
return a.noteBus.Subscribe(include, exclude)
}
func (a *app) UnsubscribeNoteChanges(sub *notebus.Subscriber) {
a.noteBus.Unsubscribe(sub)
}
// Для admin monitoring.
func (a *app) NoteBusStats() notebus.Stats {
return a.noteBus.GetStats()
}
Env interface в case-пакетах (commitnotes, hidenotes):
// internal/case/commitnotes/resolve.go
type Env interface {
// ...existing...
PublishNoteEvent(event model.NoteBusEvent)
}
Тестируемость: в тестах мокаем PublishNoteEvent и проверяем что он вызван с правильным событием.
Resolver
func (r *subscriptionResolver) NoteChanges(ctx context.Context, input *model.NoteChangesInput) (<-chan *model.NoteChangeEvent, error) {
// 1. Проверить API key
_, err := checkapikey.Resolve(ctx, r.env(ctx), "note_changes")
if err != nil {
return nil, err
}
// 2. Парсить patterns
var include, exclude []string
if input != nil {
include = input.IncludePatterns
exclude = input.ExcludePatterns
}
// 3. Подписаться через app
env := r.env(ctx)
sub := env.SubscribeNoteChanges(include, exclude)
// 4. Маппинг канала model.NoteBusEvent → model.NoteChangeEvent (GraphQL type)
ch := make(chan *model.NoteChangeEvent, 1)
go func() {
defer close(ch)
defer env.UnsubscribeNoteChanges(sub)
for {
select {
case <-ctx.Done():
return
case event, ok := <-sub.Ch:
if !ok {
return
}
ch <- &model.NoteChangeEvent{
Paths: event.Paths,
Hashes: event.Hashes,
Event: event.Event,
Version: event.Version,
}
}
}
}()
return ch, nil
}
Точки интеграции (PublishNoteEvent)
1. HandleLatestNotesAfterSave в cmd/server/main.go:
Уже резолвит pathIDs → paths + определяет event type. Добавить PublishNoteEvent рядом с webhook dispatch.
TODO при реализации: проверить как webhook dispatch получает paths и event types — переиспользовать тот же механизм. Вероятно есть batch-запрос NotePathsByIDs или аналог (не использовать NotePathByID в цикле — это N запросов).
func (a *app) HandleLatestNotesAfterSave(ctx context.Context, changedPathIDs []int64) error {
// ...existing: updatesubgraphs, handletgpublishviews, embeddings...
// Версия уже инкрементирована до вызова этой функции (в commitNotes).
version := ... // передаётся как параметр или читается из контекста
// Собираем изменения (переиспользуем данные из webhook dispatch).
// Используем batch query, не NotePathByID в цикле.
notePaths, npErr := a.NotePathsByIDs(ctx, changedPathIDs)
if npErr != nil { ... }
var busPaths []string
var busHashes []string
for _, np := range notePaths {
busPaths = append(busPaths, np.Value)
busHashes = append(busHashes, np.LatestContentHash)
}
if len(busPaths) > 0 {
// Event type берём из webhook dispatch (create/update различаются).
a.PublishNoteEvent(model.NoteBusEvent{
Paths: busPaths,
Hashes: busHashes,
Event: eventType, // "CREATE" или "UPDATE" — из той же логики что для webhooks
Version: version,
})
}
// ...existing webhook dispatch...
}
2. hideNotes в internal/case/hidenotes/resolve.go:
Добавить PublishNoteEvent в Env interface и вызвать рядом с triggerWebhooks.
TODO при реализации: проверить что Resolve() в hidenotes имеет доступ к paths как строкам, а не только pathIDs. Проверить flow — hideNotes использует HandleLatestNotesAfterSave или отдельный dispatch?
type Env interface {
// ...existing...
PublishNoteEvent(event model.NoteBusEvent)
}
// В Resolve(), после triggerWebhooks:
env.PublishNoteEvent(model.NoteBusEvent{
Paths: paths, // строковые пути
Hashes: []string{}, // remove — пустой массив
Event: "REMOVE",
Version: version, // из IncrementSyncVersion()
})
3. pushNotes без commit (skipCommit=true):
- Не публикуем — файлы ещё не закоммичены
- Событие уйдёт при commitNotes
Авторизация
SSE подписка использует тот же X-Api-Key что и queries. Проверка через checkapikey.Resolve(). API key создаётся в админке и привязан к сайту.
Нюанс: SSE — long-lived соединение. Если API key отозван:
- Текущая подписка продолжит работать до дисконнекта
- Это ОК — аналогично поведению webhook deliveries
- При реконнекте — получит ошибку авторизации
SSE Heartbeat / Keepalive
SSE соединения могут молча умирать за NAT/прокси. Решение:
Сервер: transport.SSE{KeepAlivePingInterval: 30 * time.Second} — если gqlgen поддерживает. Иначе — периодическая отправка SSE comment (:keepalive\n\n) из middleware.
Клиент (Obsidian): таймер 60 секунд. Если за 60 секунд не пришло ни одного SSE event (включая keepalive) — считаем соединение мёртвым → reconnect.
Клиент (браузер, mol): браузерный EventSource API автоматически обнаруживает разрыв. Дополнительная логика не нужна.
Observability
Bus.GetStats() возвращает Stats{Subscribers, Published, Dropped}. Доступно через:
type NoteBusStats {
subscribers: Int!
published: Int64!
dropped: Int64!
}
type Admin {
# ...existing...
noteBusStats: NoteBusStats!
}
Страница в админке для мониторинга. Позже — Prometheus metrics.
Delta Sync: убрать FetchServerHashes на 2000 файлов
Проблема (скриншот Network tab)
graphql 82.5 kB 769ms FetchServerHashes — ВСЕ 2000 хэшей
graphql 82.5 kB 1.03s FetchServerHashes повторно (badge check)
graphql 82.5 kB 1.07s FetchServerHashes ещё раз
graphql 223 kB 974ms PushNotes / FetchNoteContents
graphql 223 kB 1.63s PushNotes / FetchNoteContents
─────────────────────────────────────────────────────────
Итого: 868 kB ~6s При том что изменилось 0-5 файлов
82.5 kB x 3 = 247 kB только на хэши, при каждом sync. При 2000 заметках это ~40 байт x 2000 = 80 кБ за запрос. Растёт линейно с количеством файлов.
Решение: глобальная версия + delta query
Добавить глобальный счётчик версий. Клиент хранит lastSyncedVersion и запрашивает только изменения.
Новая колонка в note_paths:
alter table note_paths add column last_changed_version integer not null default 0;
Отдельная таблица для глобального счётчика:
create table sync_version (
id integer primary key check (id = 1), -- singleton row
version integer not null default 0
);
insert into sync_version (id, version) values (1, 0);
Инкремент при commitNotes / hideNotes:
-- 1. Атомарно инкрементировать глобальную версию
update sync_version set version = version + 1 where id = 1 returning version;
-- 2. Записать новую версию в изменённые paths
update note_paths set last_changed_version = ? where id in (?...);
Одна транзакция: инкремент + обновление paths. sqlc query для обоих. Выполняется до HandleLatestNotesAfterSave, чтобы версия была в базе к моменту Publish.
Новый GraphQL фильтр:
input NotePathsFilter {
like: String
paths: [String!] # NEW: фильтр по конкретным путям
changedSinceVersion: Int64 # NEW: только изменённые после версии
}
input NotePathsDeltaFilter {
sinceVersion: Int64!
}
type NotePathsDelta {
"""Текущая глобальная версия сайта."""
currentVersion: Int64!
"""Изменённые paths (пустой если нет изменений)."""
notePaths: [NotePath!]!
"""Paths удалённых/скрытых заметок с момента версии."""
removedPaths: [String!]!
}
type Query {
# ...existing...
"""Delta sync: только изменения с указанной версии. X-Api-Key required."""
notePathsDelta(filter: NotePathsDeltaFilter!): NotePathsDelta!
}
Как меняется flow sync
Сейчас (2000 заметок, 5-10 сек):
1. FetchServerHashes → 2000 хэшей, 82 kB, ~1s
2. Compute 2000 local hashes (cached by mtime, ~5ms)
3. Classify 2000 files
4. Push/Pull changed files
После delta sync (2000 заметок, <1 сек):
1. notePathsDelta(sinceVersion: 142) → 3 changed paths, ~200 bytes, ~50ms
2. Compute hashes только для 3 файлов
3. Classify 3 файла
4. Push/Pull changed files
Первый sync / reset (sinceVersion = 0):
Как сейчас — полный FetchServerHashes. Один раз.
Как это работает с SSE
SSE и delta sync дополняют друг друга:
Нормальная работа:
SSE подписка → получаем события реального времени
Каждое событие содержит version → обновляем lastSyncedVersion
Реконнект после offline (порядок критичен!):
1. Подключить SSE (начинаем получать новые события в буфер)
2. notePathsDelta(sinceVersion: lastSyncedVersion) → пропущенные изменения
3. Merge пропущенные + буферизованные события → autoPull
4. Обновить lastSyncedVersion
Важно: сначала SSE, потом delta — иначе события между delta и SSE потеряны.
Cold start (Obsidian запустился):
1. notePathsDelta(sinceVersion: lastSyncedVersion) → delta с прошлой сессии
2. Подключаем SSE
Fallback (lastSyncedVersion === undefined или невалиден):
Полный FetchServerHashes → как сейчас. Один раз.
Хранение removedPaths
Для delta sync нужно знать какие файлы были удалены/скрыты после sinceVersion.
Используем hidden_by + last_changed_version (без новой таблицы):
- Скрытые paths уже помечены
hidden_by IS NOT NULL - При hideNotes → обновлять
last_changed_version - Delta query:
WHERE last_changed_version > sinceVersion AND hidden_by IS NOT NULL→ removedPaths
Хранение lastSyncedVersion в клиенте
interface SyncState {
files: Record<string, string>; // path → lastSyncedHash
mtimes?: Record<string, number>;
localHashes?: Record<string, string>;
lastSyncedAt?: number;
lastSyncedVersion?: number; // NEW: глобальная версия сервера
}
При lastSyncedVersion === undefined → первый sync, полный FetchServerHashes. После первого sync → delta.
Frontend: Obsidian Plugin
Новые настройки
interface SyncDir {
// ...existing...
/**
* Glob patterns для live pull подписки.
* Если указаны — SSE соединение активно, auto-pull включён.
* Если пустой массив / не указано — live pull выключен.
* Примеры: ["**"], ["blog/**", "docs/**"]
*/
livePullIncludePatterns?: string[];
/**
* Glob patterns для исключения из live pull.
* Примеры: ["drafts/**", "private/**"]
*/
livePullExcludePatterns?: string[];
}
Live pull включён когда livePullIncludePatterns непустой. twoWaySync должен быть включён (иначе нечего тянуть).
В UI настроек:
[x] Two-way sync
Live pull include patterns: [** ]
Live pull exclude patterns: [drafts/** ]
Поле текстовое, patterns через запятую. Пустое поле = live pull выключен. Пользователь может ограничить какие папки получать автоматически, а какие — только при ручном sync.
Каждый syncDir — отдельный сервер. Одно SSE соединение на syncDir. Мультиплексирование не нужно — syncDir-ы подключены к разным инстансам trip2g.
SSE соединение
Obsidian плагин не может использовать $trip2g_sse_host (это mol-класс). Нужна своя реализация на чистом JS, аналогичная по логике:
class LivePullConnection {
private controller: AbortController | null = null;
private reconnectTimer: number | null = null;
private reconnectDelay = 3000;
private lastEventAt = 0;
private healthCheckTimer: number | null = null;
private healthCheckInterval = 60000; // 60 сек — если нет данных, reconnect
constructor(
private apiUrl: string,
private apiKey: string,
private includePatterns: string[],
private excludePatterns: string[],
private onChanges: (event: NoteChangeEvent) => void,
private onStatusChange: (connected: boolean) => void,
) {}
connect(): void { ... }
disconnect(): void { ... }
private async stream(signal: AbortSignal): Promise<void> { ... }
private startHealthCheck(): void {
// Каждые 60 сек проверять lastEventAt.
// Если > 60 сек без данных (включая keepalive) → reconnect.
}
}
GraphQL subscription query:
subscription NoteChanges($input: NoteChangesInput) {
noteChanges(input: $input) {
paths
hashes
event
version
}
}
SSE transport: POST /graphql с Accept: text/event-stream (аналогично $trip2g_sse_host).
Auto-Pull логика
async autoPull(event: NoteChangeEvent): Promise<void> {
if (this.isSyncing) {
// Sync уже идёт — отложить, проверим после
this.pendingAfterSync = event;
return;
}
const { paths, hashes, event: eventType, version } = event;
if (eventType === "REMOVE") {
// Проверить, есть ли эти файлы локально
// Если есть — показать ServerDeletedModal
this.updateLastSyncedVersion(version);
return;
}
// CREATE или UPDATE: classify по хэшам из события (без доп. запроса)
const safePulls: string[] = [];
const conflicts: string[] = [];
for (let i = 0; i < paths.length; i++) {
const path = paths[i];
const remoteHash = hashes[i];
const lastSynced = syncState.files[path];
const localHash = await computeLocalHash(path);
if (eventType === "CREATE" && !localHash) {
// Новый файл, локально нет — всегда safe pull
safePulls.push(path);
} else {
const action = classifyFile(localHash, remoteHash, lastSynced);
if (action === "pull") {
safePulls.push(path);
} else if (action === "conflict") {
conflicts.push(path);
}
// push, unchanged — игнорируем в auto-pull
}
}
// Выполнить safe pulls
if (safePulls.length > 0) {
await pullFiles(safePulls);
// Download assets for pulled notes
await downloadAssetsForNotes(env, safePulls);
new Notice(`↓ ${safePulls.length} files updated from server`);
}
// Конфликты — уведомить
if (conflicts.length > 0) {
new Notice(`⚠ ${conflicts.length} conflicts detected. Click sync to resolve.`);
updateBadge();
}
this.updateLastSyncedVersion(version);
}
Ассеты при auto-pull
Текущий flow при обычном pull:
1. Pull note content (FetchNoteContents)
2. Write .md file
3. FetchNoteAssets(paths) → список ассетов [{absolutePath, url, hash}]
4. Для каждого ассета: exists locally? → нет → downloadAsset(url)
Переиспользуем downloadAssetsForNotes(env, pulledPaths) из execute.ts как есть. Он уже:
- Вызывает
fetchNoteAssets(paths)только для pulled files - Дедуплицирует по absolutePath
- Проверяет
fileExists()перед скачиванием - Создаёт директории при необходимости
Оптимизация на будущее: добавить assetReplaces в FetchNoteContents, чтобы получать контент + ассеты одним запросом (уже возможно в текущей схеме — NotePath имеет и content, и assetReplaces).
Жизненный цикл SSE соединения
onload():
for each syncDir where livePull && twoWaySync:
create LivePullConnection
connect()
onunload():
disconnect all connections
Settings changed:
if livePull toggled on → connect()
if livePull toggled off → disconnect()
if apiUrl/apiKey changed → reconnect()
Connection lost:
auto-reconnect с delay 3s → 6s → 12s → 30s (exponential backoff, max 30s)
No data for 60s (health check):
reconnect (серверный keepalive должен приходить каждые 30s)
Window focus:
if connection dead → reconnect immediately
Include/Exclude patterns в плагине
Patterns передаются напрямую из настроек в SSE подписку:
function buildSubscriptionInput(syncDir: SyncDir): NoteChangesInput | null {
const include = syncDir.livePullIncludePatterns;
if (!include || include.length === 0) return null; // live pull выключен
return {
includePatterns: include,
excludePatterns: syncDir.livePullExcludePatterns ?? [],
};
}
Важно: syncDir.path — это клиентская логика (папка в Obsidian vault). На сервере paths уже относительные. Patterns в подписке матчат серверные paths как есть. Плагин не трогает файлы вне syncDir.path — это ортогональная защита на уровне клиента.
Пользователь контролирует:
["**"]— получать всё["blog/**", "docs/**"]— только определённые папки- exclude
["drafts/**"]— не получать черновики - Пустое поле — live pull выключен, только ручной sync
Защита от собственных изменений
Когда Obsidian plugin делает push → commitNotes → событие приходит обратно по SSE.
Решение: не фильтровать, положиться на classify + хэши в событии.
При получении собственного события:
- SSE приходит с
{paths: ["note.md"], hashes: ["abc123"]} - Плагин сравнивает:
syncState.files["note.md"] === "abc123"→ да, только что запушили classifyFile(localHash, remoteHash="abc123", lastSynced="abc123")→unchanged- Никаких запросов, никаких действий
Frontend: Админка (mol)
Админка использует существующую инфраструктуру $trip2g_graphql_subscription().
Интеграция
// Подписка на все изменения (админ видит всё)
const sub = $trip2g_graphql_subscription(`
subscription NoteChanges($input: NoteChangesInput) {
noteChanges(input: $input) { paths event hashes version }
}
`, { input: { includePatterns: ["**"] } })
Поведение
При получении события — инвалидировать кэш reset_query_marker:
@ $mol_mem
note_changes() {
const data = this.subscription().data()
if (!data) return null
// Инвалидировать кэш — все query перезапросят данные
// reset_query_marker уже есть в graphql/index.ts
return data.noteChanges
}
Админ видит:
- Список изменений в реальном времени (какие страницы изменились)
- Автообновление если открыта страница редактирования изменённой заметки
- Notice: "Страница blog/post.md обновлена агентом"
Frontend: Публичный сайт
Плашка "Контент обновился"
Для авторизованных пользователей (или всех — по настройке):
// Подписка на текущую страницу
@ $mol_mem
page_update_subscription() {
const path = this.current_page_path()
if (!path) return null
return $trip2g_graphql_raw_subscription(`
subscription NoteChanges($input: NoteChangesInput) {
noteChanges(input: $input) { paths event hashes version }
}
`, { input: { includePatterns: [path] } })
}
@ $mol_mem
has_update() {
const data = this.page_update_subscription()?.data()
if (!data) return false
// Проверить что текущая страница в списке изменённых
return data.noteChanges.paths.includes(this.current_page_path())
}
UI плашки
┌──────────────────────────────────────────────┐
│ ℹ Эта страница была обновлена. [Обновить] │
└──────────────────────────────────────────────┘
- Показывается вверху страницы (sticky banner)
- "Обновить" →
location.reload()или fetch нового контента без перезагрузки - Плашка исчезает при закрытии или обновлении
- Не показывается для
event: REMOVE(страница удалена — другая логика)
Авторизация для публичного сайта
Текущий resolver проверяет API key. Для публичного сайта нужно:
- Либо отдельная подписка
publicNoteChangesбез авторизации (только публичные заметки) - Либо разрешить
noteChangesдля авторизованных пользователей (по сессии, не API key) - Либо отложить на будущее — сейчас только для админки и Obsidian (оба с API key)
Рекомендация: начать без публичного сайта. Добавить позже когда ясна модель авторизации.
План реализации
Фаза 1: Backend — notebus + подписка + delta sync
internal/model/notebus.go—NoteBusEventstructinternal/notebus/notebus.go—Busс Subscribe/Unsubscribe/Publish, Stats, логирование drops- Тесты notebus — subscribe, pattern filtering, fan-out, unsubscribe, buffer overflow
- Встроить в
app—noteBusfield,PublishNoteEvent(),SubscribeNoteChanges(),UnsubscribeNoteChanges(),NoteBusStats() - Миграция —
sync_versionтаблица (singleton),note_paths.last_changed_versionколонка - sqlc queries —
IncrementSyncVersion,NotePathsChangedSince,NotePathsRemovedSince - Инкремент версии — в
commitNotesиhideNotes(до HandleLatestNotesAfterSave, одна транзакция) - Схема GraphQL —
NoteChangeEventTypeenum,noteChangessubscription,NoteChangesInput,NoteChangeEvent(с hashes + version),notePathsDeltaquery,NoteBusStatsв admin make gqlgen— сгенерировать код- Resolver —
NoteChanges()с API key, подписка черезenv.SubscribeNoteChanges() - Resolver —
notePathsDelta()с API key - Publish из
HandleLatestNotesAfterSave— рядом с webhook dispatch, реальный event type (CREATE/UPDATE) - Publish из
hideNotes— добавитьPublishNoteEventв Env interface - SSE keepalive —
transport.SSE{KeepAlivePingInterval: 30 * time.Second}или аналог - Тест:
curlSSE подписка + pushNotes через CLI → события приходят
Фаза 2: Obsidian SSE live pull + delta sync
LivePullConnection— SSE клиент с reconnect + health check (60s timeout)- Настройки —
livePullIncludePatterns,livePullExcludePatterns autoPull()— classify по хэшам из события, safe pull, conflict → badge, CREATE → always pull- Delta sync —
lastSyncedVersionв SyncState,notePathsDeltaвместоFetchServerHashes - Reconnect — SSE first, then
notePathsDelta(sinceVersion)для пропущенных событий - Жизненный цикл — connect/disconnect в onload/onunload/settings
Фаза 3: Админка (mol) — live updates
- Подписка —
$trip2g_graphql_subscription()с["**"] - Инвалидация кэша — при событии обновить данные
- Notice — показать что изменилось
- Monitoring — страница NoteBusStats в админке
Фаза 4: Публичный сайт (отложить)
- Определить модель авторизации
- Подписка на конкретную страницу
- Плашка "Контент обновился"
Edge Cases
| Кейс | Поведение |
|---|---|
| SSE отключился, пропущены события | Reconnect: SSE first → notePathsDelta(sinceVersion) → merge. Fallback на полный sync если version невалиден |
| Пользователь редактирует файл, пришёл update | classifyFile → conflict → badge + Notice |
| Batch commit (10 файлов) | Один Publish с paths: [10 items] — одно событие |
twoWaySync выключен |
SSE не подключается, livePull недоступен |
| API key отозван | SSE получит ошибку при реконнекте, покажет Notice |
| Plugin unload во время auto-pull | AbortController отменит все fetch-и |
| Несколько syncDir | Отдельные серверы — отдельные SSE подписки |
publishField фильтр |
Auto-pull проверяет publishField перед записью файла |
| Файл удалён локально, пришёл update | classifyFile → pull (новый файл) → скачать |
| Obsidian закрыт, накопились изменения | При открытии — notePathsDelta(lastSyncedVersion) ловит всё |
| Нет данных 60 сек (dead connection) | Health check → reconnect |
| Slow consumer (буфер заполнен) | Событие dropped, залогировано, stats.Dropped++ |
| Получили своё же событие после push | classify по хэшам → unchanged → no-op |
Безопасность
- Нет авто-push: пользователь всегда контролирует когда отправлять
- Конфликты не перезаписываются: только safe pulls (localHash === lastSyncedHash)
- publishField защита: авто-pull не скачивает файлы без publish field
- API key scope: подписка использует тот же ключ что и sync
- Defense in depth: даже если SSE отправит невалидное событие, classify проверит хэши
TODO / Будущие улучшения
Оптимистичная блокировка в pushNotes
Проблема: Два человека одновременно редактируют файл → оба делают push → последний перезаписывает изменения первого.
Решение: Добавить expectedHash в pushNotes mutation:
input PushNotesNoteContentInput {
path: String!
content: String!
expectedHash: String # NEW: хэш который клиент ожидает увидеть на сервере
}
Поведение:
- Клиент при push передаёт
expectedHash= тот хэш, который был при последнем pull/sync - Сервер проверяет:
currentHash === expectedHash - Если не совпадает → возвращает ошибку
CONFLICTс актуальным хэшем - Клиент показывает конфликт, предлагает сначала pull
Преимущества:
- Защита от lost updates (write-write conflicts)
- Атомарная проверка + запись в одной транзакции
- Совместим с текущим classify flow
Реализация:
- Миграция: нет, используем существующий
latest_content_hash - Backend: проверка в
pushNotesперед записью - Frontend: Obsidian plugin передаёт
syncState.files[path]какexpectedHash - Админка: можно передавать
null(skip проверки) для админских правок
Приоритет: Средний. Сценарий редкий (два человека редактируют один файл одновременно), но последствия серьёзные (потеря данных).