Overview
AuthentiVoice provides comprehensive audio processing capabilities to prepare call recordings for analysis and fraud detection.
Audio Upload
MP3
Most common format, widely supported
WAV
Uncompressed, high quality
Upload Process
// Frontend upload example
const uploadAudio = async (file: File) => {
const formData = new FormData();
formData.append('file', file);
const response = await fetch('/api/v1/audio/upload', {
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'x-orgid': organizationId
},
body: formData
});
return response.json();
};
Audio Trimming
Silence Detection
The system automatically detects and removes silence from the beginning and end of audio files:
# Backend trimming logic
def trim_silence(audio_segment, silence_threshold=-50.0, chunk_size=10):
"""
Remove silence from audio edges.
Args:
audio_segment: pydub AudioSegment
silence_threshold: dBFS threshold for silence
chunk_size: Size of audio chunks to analyze (ms)
"""
# Detect leading silence
start_trim = detect_leading_silence(audio_segment)
# Detect trailing silence
end_trim = detect_leading_silence(audio_segment.reverse())
# Return trimmed audio
duration = len(audio_segment)
return audio_segment[start_trim:duration-end_trim]
Trimming Parameters
The volume threshold in dBFS below which audio is considered silence
Minimum duration of silence in milliseconds to be trimmed
Milliseconds of padding to leave at edges after trimming
Audio Enhancement
Noise Reduction
# Apply noise reduction
def reduce_noise(audio_data, noise_profile=None):
"""
Reduce background noise in audio.
"""
# Convert to numpy array
samples = np.array(audio_data.get_array_of_samples())
# Apply spectral gating
reduced = nr.reduce_noise(
y=samples,
sr=audio_data.frame_rate,
stationary=True,
prop_decrease=0.8
)
return reduced
Volume Normalization
# Normalize audio volume
def normalize_volume(audio_segment, target_dBFS=-20.0):
"""
Normalize audio to target volume level.
"""
change_in_dBFS = target_dBFS - audio_segment.dBFS
return audio_segment.apply_gain(change_in_dBFS)
Automatic Conversion
The system automatically converts uploaded files to optimal formats for processing:
# Convert audio format
def convert_audio_format(
input_data: bytes,
input_format: str,
output_format: str = "wav",
sample_rate: int = 16000
) -> bytes:
"""
Convert audio between formats.
"""
# Load audio
audio = AudioSegment.from_file(
io.BytesIO(input_data),
format=input_format
)
# Resample if needed
if audio.frame_rate != sample_rate:
audio = audio.set_frame_rate(sample_rate)
# Convert to mono for speech processing
if audio.channels > 1:
audio = audio.set_channels(1)
# Export in new format
output = io.BytesIO()
audio.export(output, format=output_format)
return output.getvalue()
Audio Properties
The system extracts comprehensive metadata from uploaded files:
def extract_audio_metadata(file_path: str) -> dict:
"""
Extract metadata from audio file.
"""
audio = AudioSegment.from_file(file_path)
return {
"duration_seconds": len(audio) / 1000.0,
"sample_rate": audio.frame_rate,
"channels": audio.channels,
"bit_depth": audio.sample_width * 8,
"format": detect_format(file_path),
"file_size_bytes": os.path.getsize(file_path),
"average_volume_dBFS": audio.dBFS,
"max_volume_dBFS": audio.max_dBFS,
"rms": audio.rms
}
Storage Management
File Organization
s3://bucket/
├── {org_id}/
│ ├── raw/
│ │ └── {file_id}/
│ │ └── original.mp3
│ ├── processed/
│ │ └── {file_id}/
│ │ ├── trimmed.wav
│ │ └── normalized.wav
│ └── transcripts/
│ └── {file_id}/
│ └── transcript.json
Presigned URLs
Generate secure, time-limited URLs for audio playback:
def generate_audio_urls(file_id: str, org_id: str) -> dict:
"""
Generate presigned URLs for audio access.
"""
return {
"raw": s3_service.generate_presigned_url(
f"{org_id}/raw/{file_id}/original.mp3",
expiration=3600
),
"processed": s3_service.generate_presigned_url(
f"{org_id}/processed/{file_id}/trimmed.wav",
expiration=3600
),
"download": s3_service.generate_presigned_url(
f"{org_id}/raw/{file_id}/original.mp3",
expiration=300,
response_disposition="attachment"
)
}
Audio Playback
Frontend Player Component
// Audio player with waveform visualization
export function AudioPlayer({ audioUrl, peaks }: AudioPlayerProps) {
const wavesurfer = useRef<WaveSurfer>();
useEffect(() => {
wavesurfer.current = WaveSurfer.create({
container: '#waveform',
waveColor: '#7c3aed',
progressColor: '#5b21b6',
cursorColor: '#a78bfa',
barWidth: 2,
barRadius: 3,
responsive: true,
height: 60,
normalize: true,
backend: 'MediaElement'
});
if (peaks) {
wavesurfer.current.load(audioUrl, peaks);
} else {
wavesurfer.current.load(audioUrl);
}
return () => wavesurfer.current?.destroy();
}, [audioUrl]);
return (
<div className="audio-player">
<div id="waveform" />
<PlaybackControls wavesurfer={wavesurfer.current} />
</div>
);
}
Batch Processing
Queue Management
# Process multiple files efficiently
async def batch_process_audio_files(
file_ids: List[str],
operations: List[str]
) -> List[dict]:
"""
Process multiple audio files in parallel.
"""
tasks = []
for file_id in file_ids:
task = asyncio.create_task(
process_audio_pipeline(file_id, operations)
)
tasks.append(task)
# Process with concurrency limit
semaphore = asyncio.Semaphore(5)
async with semaphore:
results = await asyncio.gather(*tasks)
return results
Processing Pipeline
Complete Pipeline Example
async def process_audio_pipeline(
file_id: str,
org_id: str
) -> dict:
"""
Complete audio processing pipeline.
"""
try:
# 1. Download raw audio
raw_audio = await s3_service.download_file(
f"{org_id}/raw/{file_id}/original.mp3"
)
# 2. Convert format
wav_audio = convert_audio_format(
raw_audio,
input_format="mp3",
output_format="wav",
sample_rate=16000
)
# 3. Trim silence
audio_segment = AudioSegment.from_wav(io.BytesIO(wav_audio))
trimmed = trim_silence(audio_segment)
# 4. Normalize volume
normalized = normalize_volume(trimmed, target_dBFS=-20.0)
# 5. Extract metadata
metadata = {
"original_duration": len(audio_segment) / 1000.0,
"trimmed_duration": len(trimmed) / 1000.0,
"silence_removed": (len(audio_segment) - len(trimmed)) / 1000.0
}
# 6. Save processed audio
processed_key = f"{org_id}/processed/{file_id}/processed.wav"
await s3_service.upload_file(
normalized.export(format="wav").read(),
processed_key
)
# 7. Generate waveform peaks
peaks = generate_waveform_peaks(normalized)
return {
"status": "success",
"processed_key": processed_key,
"metadata": metadata,
"peaks": peaks
}
except Exception as e:
logger.error(f"Audio processing failed: {e}")
return {
"status": "error",
"error": str(e)
}
Error Handling
Common Issues
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100MB
if file.size > MAX_FILE_SIZE:
raise HTTPException(
status_code=413,
detail="File size exceeds 100MB limit"
)
@timeout(seconds=300) # 5 minute timeout
async def process_audio(file_id: str):
# Processing logic
pass
Best Practices
Validate Input
Always validate file format and size before processing
Handle Errors Gracefully
Provide meaningful error messages and fallback options
Optimize for Performance
Use streaming for large files and process in chunks
Monitor Resources
Track memory and CPU usage during processing
Clean Up
Remove temporary files and free resources after processing