Files
ten31-transcripts/Ten31Transcripts/Backend/SparkControlClient.swift
T
Grant Gilliam 53d7fcdac0 Client: dual-channel label-merge (mic_file + system_file)
The backend shipped dual-channel mode; wire the client to it. We already capture
mic (you) and system (others) separately, so send them as two files instead of the
mono mix — fixing the misattribution at the source.

- SparkControlClient: labelMergeDual(mic_file, system_file, self_name, self_vad);
  multipart generalized to N files; shared POST/retry/decode extracted.
- SessionPackager.rebasedSelfVadData: chunk-local [{start,end}] for self_vad;
  sliceAudio reused for both tracks.
- TranscriptPipeline.process: dual-channel chunking (slice mic+system, rebase
  timeline + self_vad per chunk) when system audio is healthy; mono mixed-file
  fallback (self folded into the timeline) otherwise.
- VisualCapture.finish: write the full visual_timeline.json (remote + self merged)
  but return REMOTE (vision) segments only — self travels via the mic channel.
- TranscriptAssembler: rank mic_channel highest (the user's own track wins).
- VoiceprintStore: store the clean mic_channel self voiceprint.
- SessionController: pass mic/system URLs + remote timeline + channel self-spans +
  self_name + systemHealthy; self_vad.json now reflects the channel-verified spans.

Validated END-TO-END against the live backend on the real misattributing session:
'Go Bitcoin' (remote) is now attributed to Unknown_0, NOT the user; the user's own
lines come back source=mic_channel; per-channel ASR recovered fuller remote text.
36/36 XCTest (4 new: self_vad rebase, mic_channel ranking + voiceprint storage).
2026-06-06 13:15:29 -05:00

220 lines
9.9 KiB
Swift

import Foundation
/// Decoded `POST /api/audio/label-merge` response (verified against the live
/// backend). Handles both `transcribe=true` (start_ms/end_ms + text) and
/// `transcribe=false` (start_s/end_s + confidence) segment shapes.
struct LabelMergeResponse: Decodable {
let duration: Double
let speakers: [Speaker]
let segments: [Segment]
let fingerprints: [String: [Float]]
let models: [String: String]?
/// The backend's "unmatched" labels never persisted as a named voiceprint.
static func isUnknownName(_ name: String) -> Bool {
name.hasPrefix("Unknown_") || name == "Speaker_unknown"
}
struct Speaker: Decodable {
let cluster: String
let name: String
let source: String // visual | voiceprint | unmatched
let overlapConfidence: Double?
let matchSimilarity: Double?
let fingerprint: [Float]?
enum CodingKeys: String, CodingKey {
case cluster, name, source, fingerprint
case overlapConfidence = "overlap_confidence"
case matchSimilarity = "match_similarity"
}
}
struct Segment: Decodable {
let startMs: Int?
let endMs: Int?
let startS: Double?
let endS: Double?
let speaker: String
let text: String?
let confidence: Double?
enum CodingKeys: String, CodingKey {
case startMs = "start_ms"
case endMs = "end_ms"
case startS = "start_s"
case endS = "end_s"
case speaker, text, confidence
}
/// Start time in seconds regardless of which shape the backend used.
var startSeconds: Double { startS ?? startMs.map { Double($0) / 1000 } ?? 0 }
var endSeconds: Double { endS ?? endMs.map { Double($0) / 1000 } ?? 0 }
}
}
enum SparkControlError: Error, LocalizedError {
case invalidHost
case tooLarge // 413
case server(Int, String) // other non-2xx with {"detail":...}
case decode(String)
case retriesExhausted
var errorDescription: String? {
switch self {
case .invalidHost: return "Invalid backend host URL."
case .tooLarge: return "Audio chunk exceeds the backend's 200 MB limit."
case .server(let code, let detail): return "Backend error \(code): \(detail)"
case .decode(let msg): return "Couldn't decode backend response: \(msg)"
case .retriesExhausted: return "Backend stayed busy (503) after retries."
}
}
}
/// Talks to SparkControl's `label-merge`. **Callers must invoke sequentially**
/// (one audio request in flight) concurrent audio requests trip a GPU race
/// (503). The Phase-5 pipeline drives one chunk at a time, satisfying this.
final class SparkControlClient {
private let baseURL: String
private let urlSession: URLSession
init(baseURL: String, skipTLS: Bool) {
let trimmed = baseURL.trimmingCharacters(in: .whitespacesAndNewlines)
self.baseURL = trimmed.hasSuffix("/") ? String(trimmed.dropLast()) : trimmed
let config = URLSessionConfiguration.ephemeral
config.timeoutIntervalForRequest = 600 // diarization can take up to ~600s
config.timeoutIntervalForResource = 900
config.waitsForConnectivity = false
let delegate: URLSessionDelegate? = skipTLS ? InsecureTrustDelegate() : nil
self.urlSession = URLSession(configuration: config, delegate: delegate, delegateQueue: nil)
}
deinit { urlSession.finishTasksAndInvalidate() }
/// Mono `label-merge`: one mixed-mono file + timeline. Retries on `503`.
func labelMerge(audioURL: URL,
timeline: Data,
knownVoiceprints: [String: [Float]]?,
transcribe: Bool,
minOverlap: Double? = nil,
voiceprintThreshold: Double? = nil,
maxRetries: Int = 3) async throws -> LabelMergeResponse {
let fields = Self.commonFields(timeline: timeline, knownVoiceprints: knownVoiceprints,
transcribe: transcribe, minOverlap: minOverlap,
voiceprintThreshold: voiceprintThreshold)
let files = [(field: "file", filename: audioURL.lastPathComponent, data: try Data(contentsOf: audioURL))]
return try await perform(fields: fields, files: files, maxRetries: maxRetries)
}
/// Dual-channel `label-merge`: separate mic (local user) + system (remote)
/// tracks. The mic channel is attributed as `self_name`; `timeline` names only
/// the remote/system speakers; `selfVad` (optional) are the chunk-local windows
/// where the mic is genuinely the user (active and louder than system).
func labelMergeDual(micURL: URL,
systemURL: URL,
selfName: String,
selfVad: Data?,
timeline: Data,
knownVoiceprints: [String: [Float]]?,
transcribe: Bool,
minOverlap: Double? = nil,
voiceprintThreshold: Double? = nil,
maxRetries: Int = 3) async throws -> LabelMergeResponse {
var fields = Self.commonFields(timeline: timeline, knownVoiceprints: knownVoiceprints,
transcribe: transcribe, minOverlap: minOverlap,
voiceprintThreshold: voiceprintThreshold)
fields["self_name"] = selfName
if let selfVad, let str = String(data: selfVad, encoding: .utf8) { fields["self_vad"] = str }
let files = [
(field: "mic_file", filename: micURL.lastPathComponent, data: try Data(contentsOf: micURL)),
(field: "system_file", filename: systemURL.lastPathComponent, data: try Data(contentsOf: systemURL)),
]
return try await perform(fields: fields, files: files, maxRetries: maxRetries)
}
// MARK: - Transport
private static func commonFields(timeline: Data, knownVoiceprints: [String: [Float]]?,
transcribe: Bool, minOverlap: Double?,
voiceprintThreshold: Double?) -> [String: String] {
var fields: [String: String] = ["transcribe": transcribe ? "true" : "false"]
if let timelineString = String(data: timeline, encoding: .utf8) { fields["timeline"] = timelineString }
if let known = knownVoiceprints, !known.isEmpty,
let data = try? JSONSerialization.data(withJSONObject: known.mapValues { $0.map { Double($0) } }),
let str = String(data: data, encoding: .utf8) {
fields["known_voiceprints"] = str
}
if let minOverlap { fields["min_overlap"] = String(minOverlap) }
if let voiceprintThreshold { fields["voiceprint_threshold"] = String(voiceprintThreshold) }
return fields
}
/// Shared POST + retry-on-503 + decode. Body is built once (constant across retries).
private func perform(fields: [String: String],
files: [(field: String, filename: String, data: Data)],
maxRetries: Int) async throws -> LabelMergeResponse {
guard let url = URL(string: baseURL + "/api/audio/label-merge") else {
throw SparkControlError.invalidHost
}
let (body, contentType) = Self.multipart(fields: fields, files: files)
var attempt = 0
while true {
var request = URLRequest(url: url)
request.httpMethod = "POST"
request.setValue(contentType, forHTTPHeaderField: "Content-Type")
request.httpBody = body
let (data, response) = try await urlSession.data(for: request)
guard let http = response as? HTTPURLResponse else {
throw SparkControlError.decode("no HTTP response")
}
switch http.statusCode {
case 200..<300:
do {
return try JSONDecoder().decode(LabelMergeResponse.self, from: data)
} catch {
throw SparkControlError.decode(error.localizedDescription)
}
case 503:
attempt += 1
if attempt > maxRetries { throw SparkControlError.retriesExhausted }
let retryAfter = (http.value(forHTTPHeaderField: "Retry-After")).flatMap(Double.init) ?? 5
try await Task.sleep(nanoseconds: UInt64(max(1, retryAfter) * 1_000_000_000))
case 413:
throw SparkControlError.tooLarge
default:
throw SparkControlError.server(http.statusCode, Self.detail(from: data))
}
}
}
// MARK: - Helpers
private static func detail(from data: Data) -> String {
if let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any],
let detail = obj["detail"] as? String { return detail }
return String(data: data, encoding: .utf8) ?? "unknown error"
}
private static func multipart(fields: [String: String],
files: [(field: String, filename: String, data: Data)]) -> (Data, String) {
let boundary = "Boundary-\(UUID().uuidString)"
var body = Data()
func append(_ s: String) { body.append(s.data(using: .utf8)!) }
for (name, value) in fields {
append("--\(boundary)\r\n")
append("Content-Disposition: form-data; name=\"\(name)\"\r\n\r\n")
append("\(value)\r\n")
}
for file in files {
append("--\(boundary)\r\n")
append("Content-Disposition: form-data; name=\"\(file.field)\"; filename=\"\(file.filename)\"\r\n")
append("Content-Type: audio/wav\r\n\r\n")
body.append(file.data)
append("\r\n")
}
append("--\(boundary)--\r\n")
return (body, "multipart/form-data; boundary=\(boundary)")
}
}