import Foundation /// Drives a finished session through the backend: chunk → sequential /// `label-merge` (accumulating voiceprints) → assemble `speakers.json` → persist /// fingerprints. Requests are sequential by construction (one chunk at a time). final class TranscriptPipeline { private let client: SparkControlClient private let voiceprints: VoiceprintStore init(baseURL: String, skipTLS: Bool, voiceprints: VoiceprintStore) { self.client = SparkControlClient(baseURL: baseURL, skipTLS: skipTLS) self.voiceprints = voiceprints } /// Process a finished session. **Dual-channel** when the system track is healthy /// and present: mic (the local user) + system (remote) go as separate files, the /// `timeline` names only the remote speakers, and `selfSpans` become `self_vad`. /// Otherwise falls back to the **mono** mixed file with self folded into the /// timeline. Writes `speakers.json` into `sessionFolder`. `progress(done,total)` /// is called per chunk. func process(sessionFolder: URL, sessionId: String, app: String, micURL: URL, systemURL: URL, mixedURL: URL, timeline: [VisualTimeline.Segment], selfSpans: [VADSpan], selfName: String, systemHealthy: Bool, chunkSeconds: Double = 150, progress: ((Int, Int) async -> Void)? = nil) async throws -> SpeakersFile { let fm = FileManager.default let dual = systemHealthy && fm.fileExists(atPath: micURL.path) && fm.fileExists(atPath: systemURL.path) && SessionPackager.duration(of: systemURL) > 0 let duration = dual ? max(SessionPackager.duration(of: micURL), SessionPackager.duration(of: systemURL)) : SessionPackager.duration(of: mixedURL) // Chunk to the requested body length; overlap and the single-chunk threshold // scale with it (a 60s body shouldn't be cut by a fixed 15s margin or stay // unchunked below the 2.5-min default threshold). let overlap = ChunkMode.overlapSeconds(forBody: chunkSeconds) let plan = SessionPackager.planChunks(durationSec: duration, chunkSeconds: chunkSeconds, overlapSeconds: overlap, thresholdSec: chunkSeconds * 1.2) // Zero-duration / empty session → a valid empty speakers.json, no backend call. if plan.isEmpty || duration <= 0 { let empty = TranscriptAssembler.assemble(sessionId: sessionId, app: app, chunks: []) try empty.speakersFile.write(to: sessionFolder.appendingPathComponent("speakers.json")) await progress?(0, 0) return empty.speakersFile } let chunksDir = sessionFolder.appendingPathComponent("chunks", isDirectory: true) try? fm.createDirectory(at: chunksDir, withIntermediateDirectories: true) defer { try? fm.removeItem(at: chunksDir) } // cleanup on success OR throw // Defensive: drop any visual span covering most of the call in one unbroken // segment — the signature of a stuck/false active-speaker cue (e.g. a solid // camera-off tile read as "speaking" the whole call). Such a span would // dominate the backend's name attribution and collapse every voice onto one // name. Also salvages sessions captured before the adapter fix landed. let vis = Self.dropStuckSpans(timeline, duration: duration) // Start from stored voiceprints; accumulate this call's prints across chunks // for within-call unification (the store only persists high-confidence ones). var known = voiceprints.knownVoiceprints() var results: [TranscriptAssembler.ChunkResult] = [] // Mono fallback needs self folded into the timeline; dual sends it separately. let monoTimeline = dual ? vis : vis + Self.timeline(fromSelfSpans: selfSpans, selfName: selfName) for chunk in plan { try Task.checkCancellation() await progress?(chunk.index, plan.count) let pad = String(format: "%03d", chunk.index) let response: LabelMergeResponse if dual { let micChunk = chunksDir.appendingPathComponent("chunk_\(pad)_mic.wav") let sysChunk = chunksDir.appendingPathComponent("chunk_\(pad)_sys.wav") try SessionPackager.sliceAudio(from: micURL, startSec: chunk.start, endSec: chunk.end, to: micChunk) try SessionPackager.sliceAudio(from: systemURL, startSec: chunk.start, endSec: chunk.end, to: sysChunk) guard fm.fileExists(atPath: micChunk.path), fm.fileExists(atPath: sysChunk.path) else { continue } let timelineData = try SessionPackager.rebasedTimelineData(vis, start: chunk.start, end: chunk.end) let selfVadData = try SessionPackager.rebasedSelfVadData(selfSpans, start: chunk.start, end: chunk.end) response = try await client.labelMergeDual( micURL: micChunk, systemURL: sysChunk, selfName: selfName, selfVad: selfVadData, timeline: timelineData, knownVoiceprints: known.isEmpty ? nil : known, transcribe: true) try? fm.removeItem(at: micChunk); try? fm.removeItem(at: sysChunk) } else { let chunkURL = chunksDir.appendingPathComponent("chunk_\(pad).wav") try SessionPackager.sliceAudio(from: mixedURL, startSec: chunk.start, endSec: chunk.end, to: chunkURL) guard fm.fileExists(atPath: chunkURL.path) else { continue } // empty slice → skip let timelineData = try SessionPackager.rebasedTimelineData(monoTimeline, start: chunk.start, end: chunk.end) response = try await client.labelMerge( audioURL: chunkURL, timeline: timelineData, knownVoiceprints: known.isEmpty ? nil : known, transcribe: true) try? fm.removeItem(at: chunkURL) } for (name, fp) in response.fingerprints where !LabelMergeResponse.isUnknownName(name) { known[name] = fp } voiceprints.update(with: response) results.append(.init(chunkStart: chunk.start, response: response, bodyStart: chunk.bodyStart, bodyEnd: chunk.bodyEnd)) } await progress?(plan.count, plan.count) let assembled = TranscriptAssembler.assemble(sessionId: sessionId, app: app, chunks: results) try assembled.speakersFile.write(to: sessionFolder.appendingPathComponent("speakers.json")) // Persist every cluster's voiceprint (incl. Unknown) so the speaker editor can // teach the store a voice when the user renames an Unknown to a real name. if !assembled.allFingerprints.isEmpty, let data = try? JSONSerialization.data(withJSONObject: assembled.allFingerprints.mapValues { $0.map(Double.init) }, options: [.sortedKeys]) { try? data.write(to: sessionFolder.appendingPathComponent("cluster_fingerprints.json")) } return assembled.speakersFile } /// Build the `label-merge` timeline from mic-VAD self spans (Phase 1/2). Once /// the visual adapters land (Phase 3–4), their segments are merged in too. static func timeline(fromSelfSpans spans: [VADSpan], selfName: String) -> [VisualTimeline.Segment] { spans.map { .init(start: $0.start, end: $0.end, name: selfName, confidence: $0.confidence, source: "mic_vad") } } /// Drop visual (vision-source) spans whose single unbroken duration covers at /// least `maxFraction` of the whole call — no one legitimately speaks that long /// without a break, so it's a stuck/false cue. Self spans (mic_vad) are kept. static func dropStuckSpans(_ timeline: [VisualTimeline.Segment], duration: Double, maxFraction: Double = 0.6) -> [VisualTimeline.Segment] { guard duration > 0 else { return timeline } let limit = maxFraction * duration return timeline.filter { $0.source != "vision" || ($0.end - $0.start) < limit } } }