Files
Keysat 6be2e40f54 Phase 0 go-live polish: hands-off incremental sync + refresh action
- backend/ingest/sync_scheduler.py: periodic incremental-sync loop (every
  CRM_INGEST_SYNC_INTERVAL_MIN min); resilient, --once for testing.
- start9/0.4: "Refresh search index" action (incremental sync.py); entrypoint
  launches the scheduler as a background process when Spark/Qdrant are set;
  CRM_INGEST_SYNC_INTERVAL_MIN env; pre-release note on fastembed/mcp pins.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 09:36:06 -05:00

110 lines
4.1 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { i18n } from '../i18n'
import { sdk } from '../sdk'
import { DATA_MOUNT_PATH, IMAGE_ID } from '../utils'
/**
* Manual "Refresh search index" action (Phase-0 ingest, incremental sync).
*
* Runs an incremental, idempotent sync that brings the Qdrant search index up
* to date with CRM changes since the last sync, on the box where /data/crm.db
* lives:
*
* sync.py --db /data/crm.db (chunk → dense+BM25 → Qdrant, changed records only)
*
* Unlike "Build search index" this does NOT pass --recreate: it does not drop
* the collection, only upserts the delta. It is fast and safe to re-run any
* time; it is the manual counterpart to the background sync scheduler that runs
* automatically when ingest is configured (see INGEST_PACKAGING.md).
*
* Implementation notes:
* - The scripts import their siblings by bare name (`import config`, etc.),
* so they must run with cwd = /app/backend/ingest.
* - sync.py talks to Spark Control (dense embeds) and Qdrant (upserts), so the
* Spark/Qdrant env must be present. This action runs in its OWN subcontainer
* and does NOT go through docker_entrypoint.sh, so it cannot inherit the
* entrypoint's exports — the env is passed explicitly below.
* - allowedStatuses: 'any' — the action runs in its own subcontainer with the
* same /data volume mounted, so it works whether or not the CRM is running.
* SQLite WAL mode means a concurrently-running CRM is fine for these
* reads/derived writes.
*/
const DB_PATH = `${DATA_MOUNT_PATH}/crm.db`
const INGEST_DIR = '/app/backend/ingest'
// OPERATOR: Spark Control + Qdrant endpoints for the ingest run. These are the
// LAN defaults for the Ten31 deployment — edit them for your network. Keep them
// in sync with the export block in docker_entrypoint.sh (single source of truth
// for the values; this action needs its own copy because it does not run the
// entrypoint). Spark Control is TLS with a self-signed cert by default, hence
// SPARK_CONTROL_VERIFY_TLS = 'false'.
const ingestEnv: { [k: string]: string } = {
CRM_DB_PATH: DB_PATH,
SPARK_CONTROL_URL: 'https://192.168.1.72:62419',
SPARK_CONTROL_VERIFY_TLS: 'false',
QDRANT_URL: 'http://192.168.1.87:6333',
}
export const refreshSearchIndex = sdk.Action.withoutInput(
// id
'refresh-search-index',
// metadata
async ({ effects }) => ({
name: i18n('Refresh search index'),
description: i18n(
'Incrementally update the search index with CRM changes since the last ' +
'sync; fast, idempotent. Runs sync.py (chunk → embed → upsert) for only ' +
'the records that changed, without dropping the Qdrant `crm_chunks` ' +
'collection. Requires Spark Control and Qdrant to be reachable (set ' +
'SPARK_CONTROL_URL / QDRANT_URL). Use "Build search index" instead for a ' +
'full rebuild from scratch.',
),
warning: null,
allowedStatuses: 'any',
group: null,
visibility: 'enabled',
}),
// execution
async ({ effects }) => {
const env = ingestEnv
const subcontainer = await sdk.SubContainer.of(
effects,
{ imageId: IMAGE_ID },
sdk.Mounts.of().mountVolume({
volumeId: 'main',
subpath: null,
mountpoint: DATA_MOUNT_PATH,
readonly: false,
}),
'ten31-database-refresh-search-index',
)
try {
// Incremental sync — chunk → dense (Spark Control) + BM25 → Qdrant upsert
// for changed records only (no --recreate).
await subcontainer.execFail(
['python3', 'sync.py', '--db', DB_PATH],
{ cwd: INGEST_DIR, env },
// 30 minutes — an incremental delta is usually secondsminutes, but leave
// generous headroom for a large backlog of changes.
30 * 60 * 1000,
)
} finally {
await subcontainer.destroy()
}
return {
version: '1',
title: i18n('Search index refreshed'),
message: i18n(
'The Qdrant `crm_chunks` collection was incrementally updated with CRM ' +
'changes since the last sync. You can re-run this action any time.',
),
result: null,
}
},
)