Overview

AuthentiVoice provides comprehensive audio processing capabilities to prepare call recordings for analysis and fraud detection.

Audio Upload

Supported Formats

MP3

Most common format, widely supported

WAV

Uncompressed, high quality

M4A

Apple’s audio format

OGG

Open-source format

Upload Process

// Frontend upload example
const uploadAudio = async (file: File) => {
  const formData = new FormData();
  formData.append('file', file);
  
  const response = await fetch('/api/v1/audio/upload', {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${token}`,
      'x-orgid': organizationId
    },
    body: formData
  });
  
  return response.json();
};

Audio Trimming

Silence Detection

The system automatically detects and removes silence from the beginning and end of audio files:
# Backend trimming logic
def trim_silence(audio_segment, silence_threshold=-50.0, chunk_size=10):
    """
    Remove silence from audio edges.
    
    Args:
        audio_segment: pydub AudioSegment
        silence_threshold: dBFS threshold for silence
        chunk_size: Size of audio chunks to analyze (ms)
    """
    # Detect leading silence
    start_trim = detect_leading_silence(audio_segment)
    
    # Detect trailing silence
    end_trim = detect_leading_silence(audio_segment.reverse())
    
    # Return trimmed audio
    duration = len(audio_segment)
    return audio_segment[start_trim:duration-end_trim]

Trimming Parameters

silence_threshold
number
default:"-50"
The volume threshold in dBFS below which audio is considered silence
min_silence_duration
number
default:"100"
Minimum duration of silence in milliseconds to be trimmed
padding
number
default:"50"
Milliseconds of padding to leave at edges after trimming

Audio Enhancement

Noise Reduction

# Apply noise reduction
def reduce_noise(audio_data, noise_profile=None):
    """
    Reduce background noise in audio.
    """
    # Convert to numpy array
    samples = np.array(audio_data.get_array_of_samples())
    
    # Apply spectral gating
    reduced = nr.reduce_noise(
        y=samples,
        sr=audio_data.frame_rate,
        stationary=True,
        prop_decrease=0.8
    )
    
    return reduced

Volume Normalization

# Normalize audio volume
def normalize_volume(audio_segment, target_dBFS=-20.0):
    """
    Normalize audio to target volume level.
    """
    change_in_dBFS = target_dBFS - audio_segment.dBFS
    return audio_segment.apply_gain(change_in_dBFS)

Format Conversion

Automatic Conversion

The system automatically converts uploaded files to optimal formats for processing:
# Convert audio format
def convert_audio_format(
    input_data: bytes,
    input_format: str,
    output_format: str = "wav",
    sample_rate: int = 16000
) -> bytes:
    """
    Convert audio between formats.
    """
    # Load audio
    audio = AudioSegment.from_file(
        io.BytesIO(input_data),
        format=input_format
    )
    
    # Resample if needed
    if audio.frame_rate != sample_rate:
        audio = audio.set_frame_rate(sample_rate)
    
    # Convert to mono for speech processing
    if audio.channels > 1:
        audio = audio.set_channels(1)
    
    # Export in new format
    output = io.BytesIO()
    audio.export(output, format=output_format)
    return output.getvalue()

Metadata Extraction

Audio Properties

The system extracts comprehensive metadata from uploaded files:
def extract_audio_metadata(file_path: str) -> dict:
    """
    Extract metadata from audio file.
    """
    audio = AudioSegment.from_file(file_path)
    
    return {
        "duration_seconds": len(audio) / 1000.0,
        "sample_rate": audio.frame_rate,
        "channels": audio.channels,
        "bit_depth": audio.sample_width * 8,
        "format": detect_format(file_path),
        "file_size_bytes": os.path.getsize(file_path),
        "average_volume_dBFS": audio.dBFS,
        "max_volume_dBFS": audio.max_dBFS,
        "rms": audio.rms
    }

Storage Management

File Organization

s3://bucket/
├── {org_id}/
│   ├── raw/
│   │   └── {file_id}/
│   │       └── original.mp3
│   ├── processed/
│   │   └── {file_id}/
│   │       ├── trimmed.wav
│   │       └── normalized.wav
│   └── transcripts/
│       └── {file_id}/
│           └── transcript.json

Presigned URLs

Generate secure, time-limited URLs for audio playback:
def generate_audio_urls(file_id: str, org_id: str) -> dict:
    """
    Generate presigned URLs for audio access.
    """
    return {
        "raw": s3_service.generate_presigned_url(
            f"{org_id}/raw/{file_id}/original.mp3",
            expiration=3600
        ),
        "processed": s3_service.generate_presigned_url(
            f"{org_id}/processed/{file_id}/trimmed.wav",
            expiration=3600
        ),
        "download": s3_service.generate_presigned_url(
            f"{org_id}/raw/{file_id}/original.mp3",
            expiration=300,
            response_disposition="attachment"
        )
    }

Audio Playback

Frontend Player Component

// Audio player with waveform visualization
export function AudioPlayer({ audioUrl, peaks }: AudioPlayerProps) {
  const wavesurfer = useRef<WaveSurfer>();
  
  useEffect(() => {
    wavesurfer.current = WaveSurfer.create({
      container: '#waveform',
      waveColor: '#7c3aed',
      progressColor: '#5b21b6',
      cursorColor: '#a78bfa',
      barWidth: 2,
      barRadius: 3,
      responsive: true,
      height: 60,
      normalize: true,
      backend: 'MediaElement'
    });
    
    if (peaks) {
      wavesurfer.current.load(audioUrl, peaks);
    } else {
      wavesurfer.current.load(audioUrl);
    }
    
    return () => wavesurfer.current?.destroy();
  }, [audioUrl]);
  
  return (
    <div className="audio-player">
      <div id="waveform" />
      <PlaybackControls wavesurfer={wavesurfer.current} />
    </div>
  );
}

Batch Processing

Queue Management

# Process multiple files efficiently
async def batch_process_audio_files(
    file_ids: List[str],
    operations: List[str]
) -> List[dict]:
    """
    Process multiple audio files in parallel.
    """
    tasks = []
    
    for file_id in file_ids:
        task = asyncio.create_task(
            process_audio_pipeline(file_id, operations)
        )
        tasks.append(task)
    
    # Process with concurrency limit
    semaphore = asyncio.Semaphore(5)
    async with semaphore:
        results = await asyncio.gather(*tasks)
    
    return results

Processing Pipeline

Complete Pipeline Example

async def process_audio_pipeline(
    file_id: str,
    org_id: str
) -> dict:
    """
    Complete audio processing pipeline.
    """
    try:
        # 1. Download raw audio
        raw_audio = await s3_service.download_file(
            f"{org_id}/raw/{file_id}/original.mp3"
        )
        
        # 2. Convert format
        wav_audio = convert_audio_format(
            raw_audio,
            input_format="mp3",
            output_format="wav",
            sample_rate=16000
        )
        
        # 3. Trim silence
        audio_segment = AudioSegment.from_wav(io.BytesIO(wav_audio))
        trimmed = trim_silence(audio_segment)
        
        # 4. Normalize volume
        normalized = normalize_volume(trimmed, target_dBFS=-20.0)
        
        # 5. Extract metadata
        metadata = {
            "original_duration": len(audio_segment) / 1000.0,
            "trimmed_duration": len(trimmed) / 1000.0,
            "silence_removed": (len(audio_segment) - len(trimmed)) / 1000.0
        }
        
        # 6. Save processed audio
        processed_key = f"{org_id}/processed/{file_id}/processed.wav"
        await s3_service.upload_file(
            normalized.export(format="wav").read(),
            processed_key
        )
        
        # 7. Generate waveform peaks
        peaks = generate_waveform_peaks(normalized)
        
        return {
            "status": "success",
            "processed_key": processed_key,
            "metadata": metadata,
            "peaks": peaks
        }
        
    except Exception as e:
        logger.error(f"Audio processing failed: {e}")
        return {
            "status": "error",
            "error": str(e)
        }

Error Handling

Common Issues

try:
    audio = AudioSegment.from_file(file_path)
except CouldntDecodeError:
    raise HTTPException(
        status_code=400,
        detail="Unsupported audio format"
    )
MAX_FILE_SIZE = 100 * 1024 * 1024  # 100MB

if file.size > MAX_FILE_SIZE:
    raise HTTPException(
        status_code=413,
        detail="File size exceeds 100MB limit"
    )
@timeout(seconds=300)  # 5 minute timeout
async def process_audio(file_id: str):
    # Processing logic
    pass

Best Practices

1

Validate Input

Always validate file format and size before processing
2

Handle Errors Gracefully

Provide meaningful error messages and fallback options
3

Optimize for Performance

Use streaming for large files and process in chunks
4

Monitor Resources

Track memory and CPU usage during processing
5

Clean Up

Remove temporary files and free resources after processing