Atriva Video Pipeline FFMPEG x86 Integration
Overview
This guide explains how to integrate the Video Pipeline API service with other services in the Atriva platform, particularly AI inference containers that consume decoded frames.
Table of Contents
- Architecture Overview
- Integration Approaches
- API Reference
- Shared Storage Integration
- Configuration
- Usage Examples
- Error Handling
- Security Considerations
Architecture Overview
The Video Pipeline service is designed as a standalone microservice that can be integrated with other services through:
- REST API - For control and status queries
- Shared Volume Storage - For frame data access by AI inference containers
Service Communication
┌─────────────────┐
│ Dashboard API │
│ (Backend) │
└────────┬────────┘
│ HTTP/REST
│
┌────────▼─────────────────┐
│ Video Pipeline Service │
│ (FFmpeg x86) │
└────────┬──────────────────┘
│
│ Writes frames to
│ shared volume
│
┌────────▼─────────────────┐
│ Shared Volume │
│ /app/frames/ │
│ ├── camera_1/ │
│ └── camera_2/ │
└────────┬──────────────────┘
│
│ Read frames from
│ shared volume
│
┌────────▼─────────────────┐
│ AI Inference Containers │
│ (YOLO, etc.) │
└──────────────────────────┘
Integration Approaches
Option 1: Service-to-Service Communication (Recommended)
The video pipeline remains a standalone service, and other services communicate via REST API.
Benefits:
- Service independence
- Scalability
- Technology isolation
- Easy deployment
Implementation Example:
# dashboard-backend/app/routes/video_pipeline.py
import httpx
from fastapi import APIRouter, Depends
VIDEO_PIPELINE_URL = os.getenv("VIDEO_PIPELINE_URL", "http://video-pipeline:8002")
@router.post("/camera/{camera_id}/decode/")
async def start_camera_decode(
camera_id: int,
url: str,
fps: int = 1,
client: httpx.AsyncClient = Depends(get_http_client)
):
# Forward request to video pipeline service
response = await client.post(
f"{VIDEO_PIPELINE_URL}/api/v1/video-pipeline/decode/",
data={
"camera_id": str(camera_id),
"url": url,
"fps": fps
}
)
return response.json()
Option 2: Shared Volume Access (For AI Inference)
AI inference containers can directly access decoded frames from the shared volume without API calls.
Benefits:
- Low latency
- No network overhead
- Direct file access
- Efficient for high-throughput scenarios
Implementation Example:
# ai-inference-container code
from pathlib import Path
import cv2
FRAMES_DIR = Path("/shared/frames") # Mounted shared volume
def get_latest_frame(camera_id: str):
"""Get the latest frame for a camera from shared storage"""
camera_dir = FRAMES_DIR / camera_id
if not camera_dir.exists():
return None
# Find latest frame by modification time
jpg_files = list(camera_dir.glob("*.jpg"))
if not jpg_files:
return None
latest = max(jpg_files, key=lambda f: f.stat().st_mtime)
return cv2.imread(str(latest))
API Reference
Start Video Decoding
Endpoint: POST /api/v1/video-pipeline/decode/
Start decoding a video stream and extracting frames.
Parameters:
camera_id(string, required): Unique identifier for the camerafile(file, optional): Video file uploadurl(string, optional): Video URL (file or RTSP stream)fps(integer, optional): Frames per second to extract (default: 1)force_format(string, optional): Force hardware acceleration format (cuda, qsv, vaapi, none)
Response:
{
"message": "Decoding started",
"camera_id": "camera_1",
"output_folder": "/app/frames/camera_1",
"status": "started"
}
Get Decode Status
Endpoint: GET /api/v1/video-pipeline/decode/status/?camera_id={camera_id}
Get the current status of decoding for a camera.
Response:
{
"camera_id": "camera_1",
"status": "running",
"frame_count": 150,
"last_error": null,
"restart_count": 0
}
Stop Decoding
Endpoint: POST /api/v1/video-pipeline/decode/stop/
Stop decoding for a camera.
Parameters:
camera_id(string, required): Camera identifier
Get Latest Frame
Endpoint: GET /api/v1/video-pipeline/latest-frame/?camera_id={camera_id}
Get the most recent decoded frame as a JPEG image.
Response: JPEG image file
Get Video Information
Endpoint: POST /api/v1/video-pipeline/video-info/
Get metadata about a video file.
Parameters:
video(file, required): Video file upload
Response:
{
"message": "Video information retrieved",
"info": {
"format": "rtsp",
"codec": "h264",
"width": 1920,
"height": 1080,
"fps": 30.0,
"duration": 0.0
}
}
Hardware Acceleration Capabilities
Endpoint: GET /api/v1/video-pipeline/hw-accel-cap/
Check available hardware acceleration options.
Response:
{
"message": {
"available_hw_accelerations": ["cuda", "qsv", "vaapi"]
}
}
Health Check
Endpoint: GET /api/v1/video-pipeline/health/
Check service health status.
Response:
{
"status": "healthy",
"service": "video-pipeline"
}
Shared Storage Integration
Volume Structure
The shared volume follows this structure:
/shared/frames/ # Mount point (configurable)
├── {camera_id_1}/
│ ├── frame_0001.jpg
│ ├── frame_0002.jpg
│ ├── frame_0003.jpg
│ └── ...
├── {camera_id_2}/
│ ├── frame_0001.jpg
│ └── ...
└── ...
Frame Naming Convention
Frames are named sequentially: frame_%04d.jpg (e.g., frame_0001.jpg, frame_0002.jpg)
Accessing Frames in AI Inference Containers
- Mount the shared volume in your Docker Compose or Kubernetes deployment
- Read frames directly from the filesystem
- Monitor frame updates by checking file modification times
Docker Compose Example:
services:
video-pipeline:
volumes:
- shared-frames:/app/frames
ai-inference:
volumes:
- shared-frames:/shared/frames # Same volume, different mount point
depends_on:
- video-pipeline
volumes:
shared-frames:
Configuration
Environment Variables
# Service Configuration
VIDEO_PIPELINE_URL=http://video-pipeline:8002
# FFmpeg Configuration
FFMPEG_PATH=ffmpeg
FFPROBE_PATH=ffprobe
HW_ACCEL_OPTIONS=["cuda", "qsv", "vaapi", "none"]
# Storage Paths (inside container)
UPLOAD_FOLDER=/app/videos
OUTPUT_FOLDER=/app/frames
Docker Compose Integration
services:
video-pipeline:
build: ./video-pipeline-ffmpeg-x86
ports:
- "8002:8002"
volumes:
- shared-frames:/app/frames
- video-storage:/app/videos
environment:
- FFMPEG_PATH=ffmpeg
- FFPROBE_PATH=ffprobe
devices:
- /dev/dri:/dev/dri # For VAAPI
ai-inference:
build: ./ai-inference
volumes:
- shared-frames:/shared/frames
depends_on:
- video-pipeline
volumes:
shared-frames:
video-storage:
Usage Examples
1. Start Decoding RTSP Stream
curl -X POST "http://localhost:8002/api/v1/video-pipeline/decode/" \
-F "camera_id=camera_1" \
-F "url=rtsp://camera.example.com/stream" \
-F "fps=5"
2. Check Decoding Status
curl "http://localhost:8002/api/v1/video-pipeline/decode/status/?camera_id=camera_1"
3. Get Latest Frame via API
curl "http://localhost:8002/api/v1/video-pipeline/latest-frame/?camera_id=camera_1" \
--output latest_frame.jpg
4. Access Frame from Shared Volume (Python)
from pathlib import Path
from PIL import Image
frames_dir = Path("/shared/frames")
camera_dir = frames_dir / "camera_1"
# Get latest frame
jpg_files = sorted(camera_dir.glob("*.jpg"))
if jpg_files:
latest_frame = Image.open(jpg_files[-1])
# Process frame...
Error Handling
Common Error Scenarios
- Camera Not Found: Return 404 if camera_id doesn’t exist
- Service Unavailable: Return 503 if video pipeline is down
- Decoding Failed: Check
last_errorin status response - No Frames Available: Return 404 if no frames exist for camera
Error Response Format
{
"detail": "Error message description"
}
Security Considerations
- Authentication: Add JWT token validation for API access
- Authorization: Verify user permissions for camera operations
- Input Validation: Sanitize file uploads and URLs
- Rate Limiting: Implement request throttling
- CORS: Configure cross-origin requests appropriately
- Volume Permissions: Ensure proper file permissions on shared volumes
Monitoring and Logging
Health Checks
Implement periodic health checks:
async def check_video_pipeline_health():
try:
response = await client.get(f"{VIDEO_PIPELINE_URL}/api/v1/video-pipeline/health/")
return response.json()["status"] == "healthy"
except Exception:
return False
Logging Best Practices
- Log all decode start/stop events
- Log frame extraction counts
- Log errors with camera_id context
- Monitor shared volume disk usage
Troubleshooting
Common Issues
- Service Unavailable: Check container status and logs
- No Frames Generated: Verify video source is accessible
- Permission Errors: Check shared volume mount permissions
- Hardware Acceleration Failures: Verify GPU drivers and device access
Debug Commands
# Check service logs
docker logs video-pipeline
# Verify FFmpeg installation
docker exec video-pipeline ffmpeg -version
# Check hardware acceleration
curl http://localhost:8002/api/v1/video-pipeline/hw-accel-cap/
# List frames in shared volume
ls -la /path/to/shared/frames/camera_1/
Future Enhancements
- WebSocket Support: Real-time processing status updates
- Batch Processing: Process multiple videos simultaneously
- Frame Caching: Implement LRU cache for frequently accessed frames
- Storage Management: Automatic cleanup of old frames
- Metrics Export: Prometheus metrics for monitoring