880b56e426
Visual capture now runs alongside audio: on call start the session picks the
app's adapter, captures the call window on the SAME monotonic clock as the audio
(AudioRecorder.sharedT0Host), and on stop writes visual_timeline.json and hands
the backend the visual segments with mic-VAD self-spans merged. Any visual
failure (no adapter, no window, Screen Recording denied) leaves the session
recording audio-only — the proven path is never blocked or broken.
- CallDetector now emits DetectedCall{app, bundleID, windowID}: the exact
CGWindowID of the matched Meet browser window (native apps → nil → largest).
- VisualCapture wraps VisualObserver + AdapterRegistry, writes visual_timeline.json.
- AudioRecorder.sharedT0Host() exposes the shared t0 for frame alignment.
Hardened per a 3-lens adversarial review (concurrency / failure-isolation /
data-flow), all 6 confirmed findings fixed:
- P0 (critical): startVisual could adopt a stale capture into a DIFFERENT session
(cross-session SCStream leak + visual_timeline.json written to the wrong
folder). Now gated on session identity — generation + recorder ===, still
.recording — with fail-closed adoption; otherwise the stream is cancelled.
- P1: observer captured the browser's largest window, not the detected Meet
window. Now targets the exact CGWindowID (pickWindowIndex, unit-tested),
largest-area only as fallback.
- P2: a startVisual orphaned by a concurrent stop could leak a stream on quit.
inFlightVisual is registered before the await and drained in prepareForTermination.
- P3: trailing visual gap/segment ends could exceed duration_sec. Clamped in
VisualCapture (clampSegments/clampGaps, unit-tested).
- P4: capture pixel size used NSScreen.main scale; now uses the scale of the
display actually hosting the window (OCR clarity on secondary displays).
- VisualObserver.stop() bounds stopCapture() with a 3s timeout (mirrors audio) so
a wedged stream can't hang finalization.
25/25 XCTest pass. Live validation on real calls still pending.
352 lines
14 KiB
Swift
352 lines
14 KiB
Swift
import AVFoundation
|
||
import ScreenCaptureKit
|
||
import CoreMedia
|
||
import QuartzCore
|
||
|
||
struct RecordingResult {
|
||
let micURL: URL
|
||
let systemURL: URL
|
||
let mixedURL: URL
|
||
let duration: Double
|
||
let selfSpans: [VADSpan]
|
||
let t0Unix: Double
|
||
/// Non-nil if system-audio capture stopped early (e.g. SCStream error).
|
||
let systemNote: String?
|
||
}
|
||
|
||
/// Dual-track local audio capture for Phase 1.
|
||
///
|
||
/// - System audio via `SCStream` (`capturesAudio`); its audio handler runs on
|
||
/// `ioQueue`. A discard-only video output runs on `screenQueue` purely to keep
|
||
/// SCStream's frame pipeline drained (an unconsumed video queue can stall the
|
||
/// whole stream) — frames are dropped instantly, never stored.
|
||
/// - Mic via `AVAudioEngine` input tap: the tap deep-copies the raw buffer and
|
||
/// hands it to `ioQueue`, where it is resampled and written.
|
||
/// - **`ioQueue` is the single isolation domain** for the writers, VAD, both
|
||
/// resamplers, and lifecycle flags.
|
||
/// - One shared monotonic `t0` (`CACurrentMediaTime`). Each buffer is placed at
|
||
/// its true `(startHost − t0)` frame (gaps padded, overlaps trimmed), so mic
|
||
/// and system stay aligned and the mix is a straight sum.
|
||
/// - Live peak levels are exposed via `currentLevels()` for the UI meter.
|
||
/// - `stop()` tears the mic down first and bounds `stopCapture()` with a timeout,
|
||
/// so a wedged stream can never block finalization. No video is written.
|
||
final class AudioRecorder: NSObject, SCStreamDelegate, SCStreamOutput {
|
||
private let micURL: URL
|
||
private let systemURL: URL
|
||
private let mixedURL: URL
|
||
|
||
private let ioQueue = DispatchQueue(label: "xyz.ten31.audio.io")
|
||
private let screenQueue = DispatchQueue(label: "xyz.ten31.audio.screen")
|
||
|
||
// ioQueue-only state:
|
||
private var t0Host: Double = 0
|
||
private var t0Unix: Double = 0
|
||
private var micWriter: MonoTrackWriter?
|
||
private var systemWriter: MonoTrackWriter?
|
||
private var vad: MicVAD?
|
||
private var tornDown = true
|
||
private let micResampler = Resampler()
|
||
private let systemResampler = Resampler()
|
||
|
||
// Cross-thread, guarded by levelLock:
|
||
private let levelLock = NSLock()
|
||
private var micPeak: Float = 0
|
||
private var sysPeak: Float = 0
|
||
private var streamStopped = false
|
||
private var systemErrorMessage: String?
|
||
|
||
private var engine: AVAudioEngine?
|
||
private var stream: SCStream?
|
||
|
||
init(micURL: URL, systemURL: URL, mixedURL: URL) {
|
||
self.micURL = micURL
|
||
self.systemURL = systemURL
|
||
self.mixedURL = mixedURL
|
||
}
|
||
|
||
// MARK: - Lifecycle
|
||
|
||
func start() async throws {
|
||
let t0 = CACurrentMediaTime()
|
||
let t0u = Date().timeIntervalSince1970
|
||
try ioQueue.sync {
|
||
let mic = try MonoTrackWriter(url: self.micURL)
|
||
let sys = try MonoTrackWriter(url: self.systemURL)
|
||
self.t0Host = t0
|
||
self.t0Unix = t0u
|
||
self.micWriter = mic
|
||
self.systemWriter = sys
|
||
self.vad = MicVAD()
|
||
self.tornDown = false
|
||
}
|
||
do {
|
||
try startMic()
|
||
try await startSystem() // throws if Screen Recording is denied
|
||
} catch {
|
||
await abortStart()
|
||
throw error
|
||
}
|
||
}
|
||
|
||
/// The shared monotonic t0 (`CACurrentMediaTime` base) captured at `start()`,
|
||
/// so visual capture can timestamp frames against the exact same clock as the
|
||
/// audio. Valid only after `start()` has returned.
|
||
func sharedT0Host() -> Double { ioQueue.sync { t0Host } }
|
||
|
||
func stop() async -> RecordingResult {
|
||
// Stop the mic FIRST — always succeeds and halts mic capture immediately.
|
||
engine?.inputNode.removeTap(onBus: 0)
|
||
engine?.stop()
|
||
engine = nil
|
||
|
||
// Stop system capture WITHOUT hanging: an already-errored stream can make
|
||
// stopCapture() block forever, so skip it if it already stopped and bound
|
||
// it with a timeout otherwise.
|
||
if let stream, !flag({ self.streamStopped }) {
|
||
await Self.stopCaptureWithTimeout(stream, seconds: 3)
|
||
}
|
||
stream = nil
|
||
|
||
var micFrames: Int64 = 0
|
||
var sysFrames: Int64 = 0
|
||
var spans: [VADSpan] = []
|
||
var t0u: Double = 0
|
||
|
||
ioQueue.sync {
|
||
if let tail = micResampler.drain() {
|
||
if (micWriter?.write(tail) ?? 0) > 0 { vad?.feed(tail) }
|
||
}
|
||
if let tail = systemResampler.drain() { systemWriter?.write(tail) }
|
||
vad?.finish()
|
||
micFrames = micWriter?.framesWritten ?? 0
|
||
sysFrames = systemWriter?.framesWritten ?? 0
|
||
spans = vad?.spans ?? []
|
||
t0u = t0Unix
|
||
tornDown = true
|
||
micWriter = nil
|
||
systemWriter = nil
|
||
vad = nil
|
||
}
|
||
|
||
try? AudioMixer.mix(mic: micURL, system: systemURL, into: mixedURL)
|
||
|
||
let duration = Double(max(micFrames, sysFrames)) / 16_000.0
|
||
let note = flag { self.systemErrorMessage } as String?
|
||
return RecordingResult(
|
||
micURL: micURL, systemURL: systemURL, mixedURL: mixedURL,
|
||
duration: duration, selfSpans: spans, t0Unix: t0u, systemNote: note)
|
||
}
|
||
|
||
private func abortStart() async {
|
||
engine?.inputNode.removeTap(onBus: 0)
|
||
engine?.stop()
|
||
engine = nil
|
||
if let stream { await Self.stopCaptureWithTimeout(stream, seconds: 3) }
|
||
stream = nil
|
||
ioQueue.sync {
|
||
tornDown = true
|
||
micWriter = nil
|
||
systemWriter = nil
|
||
vad = nil
|
||
}
|
||
}
|
||
|
||
/// Latest peak levels (0…1) for each source; decays so a stalled source fades.
|
||
func currentLevels() -> (mic: Float, system: Float) {
|
||
levelLock.lock(); defer { levelLock.unlock() }
|
||
let m = micPeak, s = sysPeak
|
||
micPeak *= 0.55; sysPeak *= 0.55
|
||
return (m, s)
|
||
}
|
||
|
||
// MARK: - Ingest (ioQueue only)
|
||
|
||
/// Write audio CONTINUOUSLY; re-anchor to the timestamp only when drift is a
|
||
/// real gap (> ~100 ms), not per-buffer timestamp jitter. Correcting every
|
||
/// buffer injects/strips a few samples each time → audible rhythmic glitching.
|
||
/// The shared t0 still bounds mic/system skew to the tolerance, well within
|
||
/// what the backend merge needs.
|
||
private static let driftTolerance: Int64 = 1600 // 100 ms @ 16 kHz
|
||
|
||
private func ingestMic(_ buffer: AVAudioPCMBuffer, startHost: Double) {
|
||
guard !tornDown, let writer = micWriter, let vad else { return }
|
||
let drift = max(0, Int64(((startHost - t0Host) * 16_000).rounded())) - writer.framesWritten
|
||
var chunk: AVAudioPCMBuffer? = buffer
|
||
if drift > Self.driftTolerance { // real gap → pad to realign
|
||
let padded = writer.padSilence(drift)
|
||
if padded > 0 { vad.feedSilence(padded) }
|
||
} else if drift < -Self.driftTolerance { // far ahead → trim overlap
|
||
let trim = Int(-drift)
|
||
if trim >= Int(buffer.frameLength) { return }
|
||
chunk = Self.trimFront(buffer, by: trim)
|
||
}
|
||
guard let out = chunk else { return }
|
||
updateLevel(out, isMic: true)
|
||
if writer.write(out) > 0 { vad.feed(out) }
|
||
}
|
||
|
||
private func ingestSystem(_ buffer: AVAudioPCMBuffer, startHost: Double) {
|
||
guard !tornDown, let writer = systemWriter else { return }
|
||
let drift = max(0, Int64(((startHost - t0Host) * 16_000).rounded())) - writer.framesWritten
|
||
var chunk: AVAudioPCMBuffer? = buffer
|
||
if drift > Self.driftTolerance {
|
||
writer.padSilence(drift)
|
||
} else if drift < -Self.driftTolerance {
|
||
let trim = Int(-drift)
|
||
if trim >= Int(buffer.frameLength) { return }
|
||
chunk = Self.trimFront(buffer, by: trim)
|
||
}
|
||
guard let out = chunk else { return }
|
||
updateLevel(out, isMic: false)
|
||
writer.write(out)
|
||
}
|
||
|
||
// MARK: - Mic (AVAudioEngine)
|
||
|
||
private func startMic() throws {
|
||
let engine = AVAudioEngine()
|
||
let input = engine.inputNode
|
||
let format = input.inputFormat(forBus: 0)
|
||
|
||
input.installTap(onBus: 0, bufferSize: 4096, format: format) { [weak self] buffer, when in
|
||
guard let self else { return }
|
||
let entry = CACurrentMediaTime()
|
||
let stamped = when.isHostTimeValid ? AudioRecorder.hostSeconds(when.hostTime) : entry
|
||
let startHost = abs(stamped - entry) < 5 ? stamped : entry
|
||
guard let raw = AudioRecorder.copy(buffer) else { return }
|
||
self.ioQueue.async {
|
||
guard !self.tornDown, let resampled = self.micResampler.resample(raw) else { return }
|
||
self.ingestMic(resampled, startHost: startHost)
|
||
}
|
||
}
|
||
engine.prepare()
|
||
try engine.start()
|
||
self.engine = engine
|
||
}
|
||
|
||
// MARK: - System (ScreenCaptureKit)
|
||
|
||
private func startSystem() async throws {
|
||
let content = try await SCShareableContent.excludingDesktopWindows(false, onScreenWindowsOnly: false)
|
||
guard let display = content.displays.first else {
|
||
throw NSError(domain: "Ten31", code: 1,
|
||
userInfo: [NSLocalizedDescriptionKey: "No display available for system-audio capture."])
|
||
}
|
||
let filter = SCContentFilter(display: display, excludingWindows: [])
|
||
let config = SCStreamConfiguration()
|
||
config.capturesAudio = true
|
||
config.excludesCurrentProcessAudio = true
|
||
config.sampleRate = 48_000
|
||
config.channelCount = 2
|
||
config.width = 2
|
||
config.height = 2
|
||
config.minimumFrameInterval = CMTime(value: 1, timescale: 2) // ~2 fps tiny video
|
||
config.queueDepth = 6
|
||
|
||
let stream = SCStream(filter: filter, configuration: config, delegate: self)
|
||
try stream.addStreamOutput(self, type: .audio, sampleHandlerQueue: ioQueue)
|
||
// Discard-only video consumer keeps SCStream's frame queue drained so the
|
||
// stream stays alive; frames are dropped immediately and never stored.
|
||
try stream.addStreamOutput(self, type: .screen, sampleHandlerQueue: screenQueue)
|
||
try await stream.startCapture()
|
||
self.stream = stream
|
||
}
|
||
|
||
func stream(_ stream: SCStream, didOutputSampleBuffer sampleBuffer: CMSampleBuffer,
|
||
of type: SCStreamOutputType) {
|
||
guard type == .audio else { return } // .screen frames discarded here
|
||
guard CMSampleBufferDataIsReady(sampleBuffer),
|
||
let pcm = Self.pcmBuffer(from: sampleBuffer),
|
||
let resampled = systemResampler.resample(pcm) else { return }
|
||
let entry = CACurrentMediaTime()
|
||
let pts = CMSampleBufferGetPresentationTimeStamp(sampleBuffer)
|
||
let stamped = pts.isValid ? pts.seconds : entry
|
||
let startHost = abs(stamped - entry) < 5 ? stamped : entry
|
||
ingestSystem(resampled, startHost: startHost)
|
||
}
|
||
|
||
func stream(_ stream: SCStream, didStopWithError error: Error) {
|
||
levelLock.lock()
|
||
streamStopped = true
|
||
systemErrorMessage = error.localizedDescription
|
||
levelLock.unlock()
|
||
}
|
||
|
||
// MARK: - Helpers
|
||
|
||
private func updateLevel(_ buffer: AVAudioPCMBuffer, isMic: Bool) {
|
||
guard let ch = buffer.floatChannelData?[0] else { return }
|
||
var peak: Float = 0
|
||
let n = Int(buffer.frameLength)
|
||
var i = 0
|
||
while i < n { let a = abs(ch[i]); if a > peak { peak = a }; i += 1 }
|
||
levelLock.lock()
|
||
if isMic { if peak > micPeak { micPeak = peak } }
|
||
else { if peak > sysPeak { sysPeak = peak } }
|
||
levelLock.unlock()
|
||
}
|
||
|
||
/// Read a levelLock-guarded value.
|
||
private func flag<T>(_ body: () -> T) -> T {
|
||
levelLock.lock(); defer { levelLock.unlock() }
|
||
return body()
|
||
}
|
||
|
||
private static func stopCaptureWithTimeout(_ stream: SCStream, seconds: Double) async {
|
||
await withTaskGroup(of: Void.self) { group in
|
||
group.addTask { try? await stream.stopCapture() }
|
||
group.addTask { try? await Task.sleep(nanoseconds: UInt64(seconds * 1_000_000_000)) }
|
||
_ = await group.next() // proceed as soon as either finishes
|
||
group.cancelAll()
|
||
}
|
||
}
|
||
|
||
/// Deep-copy a PCM buffer (the engine reuses the tap buffer). Layout-agnostic.
|
||
private static func copy(_ buffer: AVAudioPCMBuffer) -> AVAudioPCMBuffer? {
|
||
guard buffer.frameLength > 0,
|
||
let out = AVAudioPCMBuffer(pcmFormat: buffer.format, frameCapacity: buffer.frameLength)
|
||
else { return nil }
|
||
out.frameLength = buffer.frameLength
|
||
let src = UnsafeMutableAudioBufferListPointer(UnsafeMutablePointer(mutating: buffer.audioBufferList))
|
||
let dst = UnsafeMutableAudioBufferListPointer(out.mutableAudioBufferList)
|
||
guard src.count == dst.count else { return nil }
|
||
for i in 0..<src.count {
|
||
guard let s = src[i].mData, let d = dst[i].mData else { return nil }
|
||
memcpy(d, s, min(Int(src[i].mDataByteSize), Int(dst[i].mDataByteSize)))
|
||
}
|
||
return out
|
||
}
|
||
|
||
private static func trimFront(_ buffer: AVAudioPCMBuffer, by frames: Int) -> AVAudioPCMBuffer? {
|
||
if frames <= 0 { return buffer }
|
||
let total = Int(buffer.frameLength)
|
||
guard frames < total, let src = buffer.floatChannelData?[0] else { return nil }
|
||
let n = AVAudioFrameCount(total - frames)
|
||
guard let out = AVAudioPCMBuffer(pcmFormat: buffer.format, frameCapacity: n),
|
||
let dst = out.floatChannelData?[0] else { return nil }
|
||
out.frameLength = n
|
||
memcpy(dst, src + frames, Int(n) * MemoryLayout<Float>.size)
|
||
return out
|
||
}
|
||
|
||
private static func hostSeconds(_ hostTime: UInt64) -> Double {
|
||
var info = mach_timebase_info_data_t()
|
||
mach_timebase_info(&info)
|
||
return Double(hostTime) * Double(info.numer) / Double(info.denom) / 1_000_000_000.0
|
||
}
|
||
|
||
private static func pcmBuffer(from sampleBuffer: CMSampleBuffer) -> AVAudioPCMBuffer? {
|
||
guard let fmtDesc = CMSampleBufferGetFormatDescription(sampleBuffer),
|
||
let asbdPtr = CMAudioFormatDescriptionGetStreamBasicDescription(fmtDesc) else { return nil }
|
||
var asbd = asbdPtr.pointee
|
||
guard let format = AVAudioFormat(streamDescription: &asbd) else { return nil }
|
||
let frames = AVAudioFrameCount(CMSampleBufferGetNumSamples(sampleBuffer))
|
||
guard frames > 0,
|
||
let buffer = AVAudioPCMBuffer(pcmFormat: format, frameCapacity: frames) else { return nil }
|
||
buffer.frameLength = frames
|
||
let status = CMSampleBufferCopyPCMDataIntoAudioBufferList(
|
||
sampleBuffer, at: 0, frameCount: Int32(frames), into: buffer.mutableAudioBufferList)
|
||
return status == noErr ? buffer : nil
|
||
}
|
||
}
|