Backend Overview

The AuthentiVoice backend is a high-performance API service built with FastAPI, designed to handle audio processing, AI-powered analysis, and integrations with external services.

Technology Stack

Project Structure

backend/
├── app/
│   ├── main.py                 # FastAPI application entry
│   ├── api/
│   │   ├── endpoints/          # API route handlers
│   │   │   ├── audio.py        # Audio processing endpoints
│   │   │   ├── analysis.py     # Analysis CRUD endpoints
│   │   │   └── integrations.py # External integrations
│   │   └── dependencies.py     # Shared dependencies
│   ├── core/
│   │   ├── config.py           # Configuration management
│   │   ├── audio_processing/   # Audio manipulation
│   │   ├── transcription/      # Speech-to-text engines
│   │   └── analysis/           # AI analysis logic
│   ├── services/
│   │   ├── s3_service.py       # S3/MinIO operations
│   │   ├── supabase_service.py # Database operations
│   │   └── integration_service.py # External integrations
│   └── models/                 # Pydantic models
│       ├── audio.py
│       ├── analysis.py
│       └── integration.py
├── modal/                      # Modal cloud deployment
└── tests/                      # Test suite

Core Components

FastAPI Application

# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.endpoints import audio, analysis, integrations

app = FastAPI(
    title="AuthentiVoice API",
    description="AI-powered voice authentication and fraud detection",
    version="1.0.0"
)

# CORS configuration
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.authentivoice.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Include routers
app.include_router(audio.router, prefix="/api/v1/audio", tags=["audio"])
app.include_router(analysis.router, prefix="/api/v1/analyses", tags=["analysis"])
app.include_router(integrations.router, prefix="/api/v1/integrations", tags=["integrations"])

API Endpoints

# app/api/endpoints/analysis.py
from fastapi import APIRouter, Depends, HTTPException
from typing import List
from app.models.analysis import CallAnalysis, CallAnalysisCreate, CallAnalysisUpdate
from app.services.supabase_service import SupabaseService

router = APIRouter()

@router.post("/", response_model=CallAnalysis)
async def create_analysis(
    analysis: CallAnalysisCreate,
    org_id: str = Depends(get_org_id),
    supabase: SupabaseService = Depends(get_supabase)
):
    """Create a new call analysis record."""
    try:
        # Create analysis record
        result = await supabase.create_call_analysis(
            org_id=org_id,
            file_id=analysis.file_id,
            file_name=analysis.file_name
        )
        
        # Trigger async processing
        background_tasks.add_task(
            process_audio_analysis,
            analysis_id=result.id
        )
        
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@router.get("/{analysis_id}", response_model=CallAnalysis)
async def get_analysis(
    analysis_id: str,
    org_id: str = Depends(get_org_id),
    supabase: SupabaseService = Depends(get_supabase)
):
    """Get a specific call analysis."""
    analysis = await supabase.get_call_analysis(analysis_id, org_id)
    if not analysis:
        raise HTTPException(status_code=404, detail="Analysis not found")
    return analysis

Audio Processing

Audio Trimming

# app/core/audio_processing/trimmer.py
from pydub import AudioSegment
from pydub.silence import detect_leading_silence
import io

class AudioTrimmer:
    def __init__(self, silence_threshold: int = -50):
        self.silence_threshold = silence_threshold
    
    def trim_silence(self, audio_data: bytes, format: str = "mp3") -> bytes:
        """Remove silence from beginning and end of audio."""
        # Load audio
        audio = AudioSegment.from_file(io.BytesIO(audio_data), format=format)
        
        # Detect silence
        start_trim = detect_leading_silence(audio, self.silence_threshold)
        end_trim = detect_leading_silence(audio.reverse(), self.silence_threshold)
        
        # Trim audio
        duration = len(audio)
        trimmed = audio[start_trim:duration-end_trim]
        
        # Export to bytes
        output = io.BytesIO()
        trimmed.export(output, format=format)
        return output.getvalue()

Transcription Engines

# app/core/transcription/whisper_engine.py
import whisper
from typing import List, Dict

class WhisperEngine:
    def __init__(self, model_name: str = "base"):
        self.model = whisper.load_model(model_name)
    
    async def transcribe(self, audio_path: str) -> List[Dict]:
        """Transcribe audio using OpenAI Whisper."""
        result = self.model.transcribe(audio_path)
        
        # Format segments
        segments = []
        for segment in result["segments"]:
            segments.append({
                "start": segment["start"],
                "end": segment["end"],
                "text": segment["text"].strip(),
                "confidence": segment.get("confidence", 1.0)
            })
        
        return segments

AI Analysis

Fraud Detection with Gemini

# app/core/analysis/fraud_detector.py
import google.generativeai as genai
from typing import Dict, List
import json

class FraudDetector:
    def __init__(self, api_key: str):
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel('gemini-1.5-flash')
    
    async def analyze_transcript(
        self, 
        transcript: str, 
        metadata: Dict = None
    ) -> Dict:
        """Analyze transcript for fraud indicators."""
        prompt = f"""
        Analyze this call transcript for potential fraud indicators:
        
        Transcript: {transcript}
        
        Consider:
        1. Suspicious patterns or inconsistencies
        2. High-pressure tactics
        3. Requests for sensitive information
        4. Unusual speech patterns or splicing
        5. Compliance violations
        
        Return a JSON response with:
        - fraud_score (0.0 to 1.0)
        - fraud_indicators (list of detected issues)
        - risk_level (low, medium, high)
        - summary (brief explanation)
        - recommendations (list of actions)
        """
        
        response = self.model.generate_content(prompt)
        
        # Parse JSON response
        try:
            result = json.loads(response.text)
            return result
        except json.JSONDecodeError:
            # Fallback parsing logic
            return self._parse_text_response(response.text)

Service Layer

S3 Storage Service

# app/services/s3_service.py
import boto3
from botocore.client import Config
import uuid
from typing import Optional

class S3Service:
    def __init__(self, endpoint_url: str, access_key: str, secret_key: str, bucket: str):
        self.client = boto3.client(
            's3',
            endpoint_url=endpoint_url,
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            config=Config(signature_version='s3v4')
        )
        self.bucket = bucket
    
    async def upload_file(
        self, 
        file_data: bytes, 
        file_name: str,
        org_id: str
    ) -> str:
        """Upload file to S3 and return the key."""
        file_id = str(uuid.uuid4())
        key = f"{org_id}/{file_id}/{file_name}"
        
        self.client.put_object(
            Bucket=self.bucket,
            Key=key,
            Body=file_data
        )
        
        return key
    
    def generate_presigned_url(
        self, 
        key: str, 
        expiration: int = 3600
    ) -> str:
        """Generate a presigned URL for file access."""
        return self.client.generate_presigned_url(
            'get_object',
            Params={'Bucket': self.bucket, 'Key': key},
            ExpiresIn=expiration
        )

Database Service

# app/services/supabase_service.py
from supabase import create_client, Client
from typing import List, Optional, Dict
import os

class SupabaseService:
    def __init__(self):
        self.client: Client = create_client(
            os.getenv("SUPABASE_URL"),
            os.getenv("SUPABASE_KEY")
        )
    
    async def create_call_analysis(
        self,
        org_id: str,
        file_id: str,
        file_name: str,
        metadata: Dict = None
    ) -> Dict:
        """Create a new call analysis record."""
        data = {
            "organization_id": org_id,
            "file_id": file_id,
            "file_name": file_name,
            "metadata": metadata or {},
            "status": "pending"
        }
        
        response = self.client.table("call_analyses").insert(data).execute()
        return response.data[0]
    
    async def update_analysis_results(
        self,
        analysis_id: str,
        transcription: List[Dict],
        fraud_score: float,
        analysis_result: Dict
    ) -> Dict:
        """Update analysis with processing results."""
        data = {
            "result": {
                "transcription": transcription,
                "fraud_score": fraud_score,
                "analysis": analysis_result
            },
            "status": "completed"
        }
        
        response = self.client.table("call_analyses") \
            .update(data) \
            .eq("id", analysis_id) \
            .execute()
        
        return response.data[0]

Background Tasks

Async Processing

# app/core/tasks.py
from app.services import s3_service, supabase_service
from app.core.audio_processing import AudioTrimmer
from app.core.transcription import WhisperEngine
from app.core.analysis import FraudDetector

async def process_audio_analysis(analysis_id: str):
    """Background task to process audio analysis."""
    try:
        # Get analysis record
        analysis = await supabase_service.get_call_analysis(analysis_id)
        
        # Download audio from S3
        audio_data = await s3_service.download_file(analysis.file_key)
        
        # Process audio
        trimmer = AudioTrimmer()
        trimmed_audio = trimmer.trim_silence(audio_data)
        
        # Save trimmed version
        trimmed_key = await s3_service.upload_file(
            trimmed_audio,
            f"trimmed_{analysis.file_name}",
            analysis.organization_id
        )
        
        # Transcribe audio
        whisper = WhisperEngine()
        transcription = await whisper.transcribe(trimmed_audio)
        
        # Analyze for fraud
        detector = FraudDetector(api_key=os.getenv("GEMINI_API_KEY"))
        fraud_analysis = await detector.analyze_transcript(
            " ".join([seg["text"] for seg in transcription])
        )
        
        # Update analysis record
        await supabase_service.update_analysis_results(
            analysis_id,
            transcription,
            fraud_analysis["fraud_score"],
            fraud_analysis
        )
        
    except Exception as e:
        # Update with error status
        await supabase_service.update_analysis_error(analysis_id, str(e))

Authentication & Security

JWT Validation

# app/api/dependencies.py
from fastapi import Header, HTTPException, Depends
from jose import jwt, JWTError
import os

async def verify_token(authorization: str = Header(...)):
    """Verify JWT token from Supabase."""
    try:
        # Extract token
        token = authorization.replace("Bearer ", "")
        
        # Decode and verify
        payload = jwt.decode(
            token,
            os.getenv("SUPABASE_JWT_SECRET"),
            algorithms=["HS256"],
            options={"verify_aud": False}
        )
        
        return payload
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

async def get_org_id(
    x_orgid: Optional[str] = Header(None),
    token_data: dict = Depends(verify_token)
) -> str:
    """Extract organization ID from headers or token."""
    if x_orgid:
        return x_orgid
    
    # Fallback to user's default org from token
    return token_data.get("user_metadata", {}).get("org_id")

Error Handling

Global Exception Handler

# app/main.py
from fastapi import Request
from fastapi.responses import JSONResponse

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    """Handle all unhandled exceptions."""
    import traceback
    
    # Log the error
    print(f"Unhandled exception: {exc}")
    traceback.print_exc()
    
    # Return generic error response
    return JSONResponse(
        status_code=500,
        content={
            "detail": "An internal error occurred",
            "type": type(exc).__name__
        }
    )

Testing

Unit Tests

# tests/test_audio_processing.py
import pytest
from app.core.audio_processing import AudioTrimmer

class TestAudioTrimmer:
    def test_trim_silence(self):
        trimmer = AudioTrimmer(silence_threshold=-40)
        
        # Load test audio with silence
        with open("tests/fixtures/audio_with_silence.mp3", "rb") as f:
            audio_data = f.read()
        
        # Trim silence
        trimmed = trimmer.trim_silence(audio_data)
        
        # Verify trimmed audio is shorter
        assert len(trimmed) < len(audio_data)

Integration Tests

# tests/test_api.py
from fastapi.testclient import TestClient
from app.main import app

client = TestClient(app)

def test_create_analysis():
    response = client.post(
        "/api/v1/analyses",
        json={
            "file_id": "test-file-id",
            "file_name": "test.mp3"
        },
        headers={
            "Authorization": "Bearer test-token",
            "x-orgid": "test-org"
        }
    )
    
    assert response.status_code == 200
    assert response.json()["status"] == "pending"

Performance Optimization

Async Operations

# Use async operations for I/O
async def process_multiple_files(file_ids: List[str]):
    """Process multiple files concurrently."""
    tasks = []
    for file_id in file_ids:
        task = asyncio.create_task(process_audio_analysis(file_id))
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)
    return results

Caching

# app/core/cache.py
from functools import lru_cache
import hashlib

@lru_cache(maxsize=100)
def get_cached_transcription(audio_hash: str):
    """Cache transcription results."""
    return supabase.get_transcription_by_hash(audio_hash)

def compute_audio_hash(audio_data: bytes) -> str:
    """Compute hash of audio data."""
    return hashlib.sha256(audio_data).hexdigest()

Deployment

Docker Configuration

FROM python:3.10-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    ffmpeg \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Run with uvicorn
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# modal/main.py
import modal

stub = modal.Stub("authentivoice-backend")

image = modal.Image.debian_slim() \
    .apt_install("ffmpeg") \
    .pip_install_from_requirements("requirements.txt")

@stub.function(
    image=image,
    secrets=[modal.Secret.from_name("authentivoice-secrets")]
)
@modal.asgi_app()
def fastapi_app():
    from app.main import app
    return app

Best Practices

  • Use consistent RESTful conventions
  • Implement proper status codes
  • Version your APIs
  • Document with OpenAPI/Swagger
  • Use specific exception types
  • Provide meaningful error messages
  • Log errors appropriately
  • Never expose sensitive information
  • Use async/await for I/O operations
  • Implement connection pooling
  • Cache expensive operations
  • Profile and optimize bottlenecks
  • Validate all inputs
  • Use parameterized queries
  • Implement rate limiting
  • Keep dependencies updated