Backend Overview
The AuthentiVoice backend is a high-performance API service built with FastAPI, designed to handle audio processing, AI-powered analysis, and integrations with external services.Technology Stack
Project Structure
Copy
backend/
├── app/
│ ├── main.py # FastAPI application entry
│ ├── api/
│ │ ├── endpoints/ # API route handlers
│ │ │ ├── audio.py # Audio processing endpoints
│ │ │ ├── analysis.py # Analysis CRUD endpoints
│ │ │ └── integrations.py # External integrations
│ │ └── dependencies.py # Shared dependencies
│ ├── core/
│ │ ├── config.py # Configuration management
│ │ ├── audio_processing/ # Audio manipulation
│ │ ├── transcription/ # Speech-to-text engines
│ │ └── analysis/ # AI analysis logic
│ ├── services/
│ │ ├── s3_service.py # S3/MinIO operations
│ │ ├── supabase_service.py # Database operations
│ │ └── integration_service.py # External integrations
│ └── models/ # Pydantic models
│ ├── audio.py
│ ├── analysis.py
│ └── integration.py
├── modal/ # Modal cloud deployment
└── tests/ # Test suite
Core Components
FastAPI Application
Copy
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.endpoints import audio, analysis, integrations
app = FastAPI(
title="AuthentiVoice API",
description="AI-powered voice authentication and fraud detection",
version="1.0.0"
)
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.authentivoice.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(audio.router, prefix="/api/v1/audio", tags=["audio"])
app.include_router(analysis.router, prefix="/api/v1/analyses", tags=["analysis"])
app.include_router(integrations.router, prefix="/api/v1/integrations", tags=["integrations"])
API Endpoints
Copy
# app/api/endpoints/analysis.py
from fastapi import APIRouter, Depends, HTTPException
from typing import List
from app.models.analysis import CallAnalysis, CallAnalysisCreate, CallAnalysisUpdate
from app.services.supabase_service import SupabaseService
router = APIRouter()
@router.post("/", response_model=CallAnalysis)
async def create_analysis(
analysis: CallAnalysisCreate,
org_id: str = Depends(get_org_id),
supabase: SupabaseService = Depends(get_supabase)
):
"""Create a new call analysis record."""
try:
# Create analysis record
result = await supabase.create_call_analysis(
org_id=org_id,
file_id=analysis.file_id,
file_name=analysis.file_name
)
# Trigger async processing
background_tasks.add_task(
process_audio_analysis,
analysis_id=result.id
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/{analysis_id}", response_model=CallAnalysis)
async def get_analysis(
analysis_id: str,
org_id: str = Depends(get_org_id),
supabase: SupabaseService = Depends(get_supabase)
):
"""Get a specific call analysis."""
analysis = await supabase.get_call_analysis(analysis_id, org_id)
if not analysis:
raise HTTPException(status_code=404, detail="Analysis not found")
return analysis
Audio Processing
Audio Trimming
Copy
# app/core/audio_processing/trimmer.py
from pydub import AudioSegment
from pydub.silence import detect_leading_silence
import io
class AudioTrimmer:
def __init__(self, silence_threshold: int = -50):
self.silence_threshold = silence_threshold
def trim_silence(self, audio_data: bytes, format: str = "mp3") -> bytes:
"""Remove silence from beginning and end of audio."""
# Load audio
audio = AudioSegment.from_file(io.BytesIO(audio_data), format=format)
# Detect silence
start_trim = detect_leading_silence(audio, self.silence_threshold)
end_trim = detect_leading_silence(audio.reverse(), self.silence_threshold)
# Trim audio
duration = len(audio)
trimmed = audio[start_trim:duration-end_trim]
# Export to bytes
output = io.BytesIO()
trimmed.export(output, format=format)
return output.getvalue()
Transcription Engines
Copy
# app/core/transcription/whisper_engine.py
import whisper
from typing import List, Dict
class WhisperEngine:
def __init__(self, model_name: str = "base"):
self.model = whisper.load_model(model_name)
async def transcribe(self, audio_path: str) -> List[Dict]:
"""Transcribe audio using OpenAI Whisper."""
result = self.model.transcribe(audio_path)
# Format segments
segments = []
for segment in result["segments"]:
segments.append({
"start": segment["start"],
"end": segment["end"],
"text": segment["text"].strip(),
"confidence": segment.get("confidence", 1.0)
})
return segments
AI Analysis
Fraud Detection with Gemini
Copy
# app/core/analysis/fraud_detector.py
import google.generativeai as genai
from typing import Dict, List
import json
class FraudDetector:
def __init__(self, api_key: str):
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel('gemini-1.5-flash')
async def analyze_transcript(
self,
transcript: str,
metadata: Dict = None
) -> Dict:
"""Analyze transcript for fraud indicators."""
prompt = f"""
Analyze this call transcript for potential fraud indicators:
Transcript: {transcript}
Consider:
1. Suspicious patterns or inconsistencies
2. High-pressure tactics
3. Requests for sensitive information
4. Unusual speech patterns or splicing
5. Compliance violations
Return a JSON response with:
- fraud_score (0.0 to 1.0)
- fraud_indicators (list of detected issues)
- risk_level (low, medium, high)
- summary (brief explanation)
- recommendations (list of actions)
"""
response = self.model.generate_content(prompt)
# Parse JSON response
try:
result = json.loads(response.text)
return result
except json.JSONDecodeError:
# Fallback parsing logic
return self._parse_text_response(response.text)
Service Layer
S3 Storage Service
Copy
# app/services/s3_service.py
import boto3
from botocore.client import Config
import uuid
from typing import Optional
class S3Service:
def __init__(self, endpoint_url: str, access_key: str, secret_key: str, bucket: str):
self.client = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
config=Config(signature_version='s3v4')
)
self.bucket = bucket
async def upload_file(
self,
file_data: bytes,
file_name: str,
org_id: str
) -> str:
"""Upload file to S3 and return the key."""
file_id = str(uuid.uuid4())
key = f"{org_id}/{file_id}/{file_name}"
self.client.put_object(
Bucket=self.bucket,
Key=key,
Body=file_data
)
return key
def generate_presigned_url(
self,
key: str,
expiration: int = 3600
) -> str:
"""Generate a presigned URL for file access."""
return self.client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket, 'Key': key},
ExpiresIn=expiration
)
Database Service
Copy
# app/services/supabase_service.py
from supabase import create_client, Client
from typing import List, Optional, Dict
import os
class SupabaseService:
def __init__(self):
self.client: Client = create_client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_KEY")
)
async def create_call_analysis(
self,
org_id: str,
file_id: str,
file_name: str,
metadata: Dict = None
) -> Dict:
"""Create a new call analysis record."""
data = {
"organization_id": org_id,
"file_id": file_id,
"file_name": file_name,
"metadata": metadata or {},
"status": "pending"
}
response = self.client.table("call_analyses").insert(data).execute()
return response.data[0]
async def update_analysis_results(
self,
analysis_id: str,
transcription: List[Dict],
fraud_score: float,
analysis_result: Dict
) -> Dict:
"""Update analysis with processing results."""
data = {
"result": {
"transcription": transcription,
"fraud_score": fraud_score,
"analysis": analysis_result
},
"status": "completed"
}
response = self.client.table("call_analyses") \
.update(data) \
.eq("id", analysis_id) \
.execute()
return response.data[0]
Background Tasks
Async Processing
Copy
# app/core/tasks.py
from app.services import s3_service, supabase_service
from app.core.audio_processing import AudioTrimmer
from app.core.transcription import WhisperEngine
from app.core.analysis import FraudDetector
async def process_audio_analysis(analysis_id: str):
"""Background task to process audio analysis."""
try:
# Get analysis record
analysis = await supabase_service.get_call_analysis(analysis_id)
# Download audio from S3
audio_data = await s3_service.download_file(analysis.file_key)
# Process audio
trimmer = AudioTrimmer()
trimmed_audio = trimmer.trim_silence(audio_data)
# Save trimmed version
trimmed_key = await s3_service.upload_file(
trimmed_audio,
f"trimmed_{analysis.file_name}",
analysis.organization_id
)
# Transcribe audio
whisper = WhisperEngine()
transcription = await whisper.transcribe(trimmed_audio)
# Analyze for fraud
detector = FraudDetector(api_key=os.getenv("GEMINI_API_KEY"))
fraud_analysis = await detector.analyze_transcript(
" ".join([seg["text"] for seg in transcription])
)
# Update analysis record
await supabase_service.update_analysis_results(
analysis_id,
transcription,
fraud_analysis["fraud_score"],
fraud_analysis
)
except Exception as e:
# Update with error status
await supabase_service.update_analysis_error(analysis_id, str(e))
Authentication & Security
JWT Validation
Copy
# app/api/dependencies.py
from fastapi import Header, HTTPException, Depends
from jose import jwt, JWTError
import os
async def verify_token(authorization: str = Header(...)):
"""Verify JWT token from Supabase."""
try:
# Extract token
token = authorization.replace("Bearer ", "")
# Decode and verify
payload = jwt.decode(
token,
os.getenv("SUPABASE_JWT_SECRET"),
algorithms=["HS256"],
options={"verify_aud": False}
)
return payload
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
async def get_org_id(
x_orgid: Optional[str] = Header(None),
token_data: dict = Depends(verify_token)
) -> str:
"""Extract organization ID from headers or token."""
if x_orgid:
return x_orgid
# Fallback to user's default org from token
return token_data.get("user_metadata", {}).get("org_id")
Error Handling
Global Exception Handler
Copy
# app/main.py
from fastapi import Request
from fastapi.responses import JSONResponse
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Handle all unhandled exceptions."""
import traceback
# Log the error
print(f"Unhandled exception: {exc}")
traceback.print_exc()
# Return generic error response
return JSONResponse(
status_code=500,
content={
"detail": "An internal error occurred",
"type": type(exc).__name__
}
)
Testing
Unit Tests
Copy
# tests/test_audio_processing.py
import pytest
from app.core.audio_processing import AudioTrimmer
class TestAudioTrimmer:
def test_trim_silence(self):
trimmer = AudioTrimmer(silence_threshold=-40)
# Load test audio with silence
with open("tests/fixtures/audio_with_silence.mp3", "rb") as f:
audio_data = f.read()
# Trim silence
trimmed = trimmer.trim_silence(audio_data)
# Verify trimmed audio is shorter
assert len(trimmed) < len(audio_data)
Integration Tests
Copy
# tests/test_api.py
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
def test_create_analysis():
response = client.post(
"/api/v1/analyses",
json={
"file_id": "test-file-id",
"file_name": "test.mp3"
},
headers={
"Authorization": "Bearer test-token",
"x-orgid": "test-org"
}
)
assert response.status_code == 200
assert response.json()["status"] == "pending"
Performance Optimization
Async Operations
Copy
# Use async operations for I/O
async def process_multiple_files(file_ids: List[str]):
"""Process multiple files concurrently."""
tasks = []
for file_id in file_ids:
task = asyncio.create_task(process_audio_analysis(file_id))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
Caching
Copy
# app/core/cache.py
from functools import lru_cache
import hashlib
@lru_cache(maxsize=100)
def get_cached_transcription(audio_hash: str):
"""Cache transcription results."""
return supabase.get_transcription_by_hash(audio_hash)
def compute_audio_hash(audio_data: bytes) -> str:
"""Compute hash of audio data."""
return hashlib.sha256(audio_data).hexdigest()
Deployment
Docker Configuration
Copy
FROM python:3.10-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Run with uvicorn
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Modal Deployment
Copy
# modal/main.py
import modal
stub = modal.Stub("authentivoice-backend")
image = modal.Image.debian_slim() \
.apt_install("ffmpeg") \
.pip_install_from_requirements("requirements.txt")
@stub.function(
image=image,
secrets=[modal.Secret.from_name("authentivoice-secrets")]
)
@modal.asgi_app()
def fastapi_app():
from app.main import app
return app
Best Practices
API Design
API Design
- Use consistent RESTful conventions
- Implement proper status codes
- Version your APIs
- Document with OpenAPI/Swagger
Error Handling
Error Handling
- Use specific exception types
- Provide meaningful error messages
- Log errors appropriately
- Never expose sensitive information
Performance
Performance
- Use async/await for I/O operations
- Implement connection pooling
- Cache expensive operations
- Profile and optimize bottlenecks
Security
Security
- Validate all inputs
- Use parameterized queries
- Implement rate limiting
- Keep dependencies updated