Phase 0 go-live polish: hands-off incremental sync + refresh action

- backend/ingest/sync_scheduler.py: periodic incremental-sync loop (every
  CRM_INGEST_SYNC_INTERVAL_MIN min); resilient, --once for testing.
- start9/0.4: "Refresh search index" action (incremental sync.py); entrypoint
  launches the scheduler as a background process when Spark/Qdrant are set;
  CRM_INGEST_SYNC_INTERVAL_MIN env; pre-release note on fastembed/mcp pins.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Keysat
2026-06-05 09:36:06 -05:00
parent f357c23c75
commit 6be2e40f54
6 changed files with 298 additions and 19 deletions
+4 -1
View File
@@ -1,4 +1,7 @@
import { sdk } from '../sdk'
import { buildSearchIndex } from './buildSearchIndex'
import { refreshSearchIndex } from './refreshSearchIndex'
export const actions = sdk.Actions.of().addAction(buildSearchIndex)
export const actions = sdk.Actions.of()
.addAction(buildSearchIndex)
.addAction(refreshSearchIndex)
@@ -0,0 +1,109 @@
import { i18n } from '../i18n'
import { sdk } from '../sdk'
import { DATA_MOUNT_PATH, IMAGE_ID } from '../utils'
/**
* Manual "Refresh search index" action (Phase-0 ingest, incremental sync).
*
* Runs an incremental, idempotent sync that brings the Qdrant search index up
* to date with CRM changes since the last sync, on the box where /data/crm.db
* lives:
*
* sync.py --db /data/crm.db (chunk → dense+BM25 → Qdrant, changed records only)
*
* Unlike "Build search index" this does NOT pass --recreate: it does not drop
* the collection, only upserts the delta. It is fast and safe to re-run any
* time; it is the manual counterpart to the background sync scheduler that runs
* automatically when ingest is configured (see INGEST_PACKAGING.md).
*
* Implementation notes:
* - The scripts import their siblings by bare name (`import config`, etc.),
* so they must run with cwd = /app/backend/ingest.
* - sync.py talks to Spark Control (dense embeds) and Qdrant (upserts), so the
* Spark/Qdrant env must be present. This action runs in its OWN subcontainer
* and does NOT go through docker_entrypoint.sh, so it cannot inherit the
* entrypoint's exports — the env is passed explicitly below.
* - allowedStatuses: 'any' — the action runs in its own subcontainer with the
* same /data volume mounted, so it works whether or not the CRM is running.
* SQLite WAL mode means a concurrently-running CRM is fine for these
* reads/derived writes.
*/
const DB_PATH = `${DATA_MOUNT_PATH}/crm.db`
const INGEST_DIR = '/app/backend/ingest'
// OPERATOR: Spark Control + Qdrant endpoints for the ingest run. These are the
// LAN defaults for the Ten31 deployment — edit them for your network. Keep them
// in sync with the export block in docker_entrypoint.sh (single source of truth
// for the values; this action needs its own copy because it does not run the
// entrypoint). Spark Control is TLS with a self-signed cert by default, hence
// SPARK_CONTROL_VERIFY_TLS = 'false'.
const ingestEnv: { [k: string]: string } = {
CRM_DB_PATH: DB_PATH,
SPARK_CONTROL_URL: 'https://192.168.1.72:62419',
SPARK_CONTROL_VERIFY_TLS: 'false',
QDRANT_URL: 'http://192.168.1.87:6333',
}
export const refreshSearchIndex = sdk.Action.withoutInput(
// id
'refresh-search-index',
// metadata
async ({ effects }) => ({
name: i18n('Refresh search index'),
description: i18n(
'Incrementally update the search index with CRM changes since the last ' +
'sync; fast, idempotent. Runs sync.py (chunk → embed → upsert) for only ' +
'the records that changed, without dropping the Qdrant `crm_chunks` ' +
'collection. Requires Spark Control and Qdrant to be reachable (set ' +
'SPARK_CONTROL_URL / QDRANT_URL). Use "Build search index" instead for a ' +
'full rebuild from scratch.',
),
warning: null,
allowedStatuses: 'any',
group: null,
visibility: 'enabled',
}),
// execution
async ({ effects }) => {
const env = ingestEnv
const subcontainer = await sdk.SubContainer.of(
effects,
{ imageId: IMAGE_ID },
sdk.Mounts.of().mountVolume({
volumeId: 'main',
subpath: null,
mountpoint: DATA_MOUNT_PATH,
readonly: false,
}),
'ten31-database-refresh-search-index',
)
try {
// Incremental sync — chunk → dense (Spark Control) + BM25 → Qdrant upsert
// for changed records only (no --recreate).
await subcontainer.execFail(
['python3', 'sync.py', '--db', DB_PATH],
{ cwd: INGEST_DIR, env },
// 30 minutes — an incremental delta is usually secondsminutes, but leave
// generous headroom for a large backlog of changes.
30 * 60 * 1000,
)
} finally {
await subcontainer.destroy()
}
return {
version: '1',
title: i18n('Search index refreshed'),
message: i18n(
'The Qdrant `crm_chunks` collection was incrementally updated with CRM ' +
'changes since the last sync. You can re-run this action any time.',
),
result: null,
}
},
)