← Back to Blog

Building an Enterprise Cybersecurity Dashboard: Architecture & Implementation

February 18, 202410 min read
FastAPIReactArchitectureSecurityNext.js

Building an Enterprise Cybersecurity Dashboard: Architecture & Implementation

Managing security incidents at scale requires robust tooling. This article details the architecture and implementation of a production-grade cybersecurity incident tracker with vulnerability management, network monitoring, and real-time dashboards.

#Why Build Another Security Tool?

Existing solutions are expensive, complex, and often overkill for mid-market organizations. I designed this platform to be: - Affordable: Open-source with free deployment options - Comprehensive: Handle incidents, vulnerabilities, and network monitoring - Modern: Built with current best practices and technologies - Scalable: Support enterprise growth without re-architecting

#Architecture Overview

The platform uses a decoupled, three-tier architecture:

Frontend (Next.js 16) ↓ API Gateway (CORS, Rate Limiting) ↓ Backend (FastAPI) ↓ PostgreSQL + Supabase

Why This Stack?

  • FastAPI: 3x faster than Flask, built-in async/await, automatic API documentation
  • Next.js: Server components, optimized builds, seamless Vercel deployment
  • PostgreSQL + Supabase: Enterprise reliability with Row-Level Security built-in
  • Tailwind + shadcn/ui: Professional UI without custom CSS

#Incident Management System

##Data Model

class Incident(Base):
    id: UUID
    organization_id: UUID
    title: str
    description: str
    severity: IncidentSeverity  # CRITICAL, HIGH, MEDIUM, LOW
    status: IncidentStatus  # OPEN, INVESTIGATING, RESOLVED, CLOSED
    assigned_to: Optional[UUID]
    created_at: datetime
    resolved_at: Optional[datetime]
    comments: List[Comment]
    vulnerabilities: List[Vulnerability]

##Key Features

1. Real-time Dashboard

The dashboard displays 6 critical metrics: - Total incidents (with trend) - Vulnerabilities by severity - Patch coverage percentage - 30-day incident trend - Mean time to resolution - Team workload distribution

// Frontend metrics calculation
const metrics = {
  totalIncidents: incidents.length,
  criticalCount: incidents.filter(i => i.severity === 'CRITICAL').length,
  patchCoverage: (vulnerabilities.patched / vulnerabilities.total) * 100,
  avgResolutionTime: calculateMTTR(resolvedIncidents),
};

2. Advanced Filtering & Search

Users need to quickly find incidents by various criteria:

@router.get("/incidents")
async def list_incidents(
    skip: int = 0,
    limit: int = 50,
    severity: Optional[str] = None,
    status: Optional[str] = None,
    assigned_to: Optional[UUID] = None,
    search: Optional[str] = None,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    query = db.query(Incident).filter(
        Incident.organization_id == current_user.organization_id
    )
    
    if severity:
        query = query.filter(Incident.severity == severity)
    if status:
        query = query.filter(Incident.status == status)
    if search:
        query = query.filter(
            Incident.title.ilike(f"%{search}%") |
            Incident.description.ilike(f"%{search}%")
        )
    
    return query.offset(skip).limit(limit).all()

3. Team Collaboration

Comments and timeline tracking enable team collaboration:

@router.post("/incidents/{incident_id}/comments")
async def add_comment(
    incident_id: UUID,
    comment: CommentCreate,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    new_comment = Comment(
        incident_id=incident_id,
        user_id=current_user.id,
        content=comment.content,
        created_at=datetime.utcnow()
    )
    db.add(new_comment)
    db.commit()
    return new_comment

#Vulnerability Management

##CVE Tracking

class Vulnerability(Base):
    id: UUID
    cve_id: str  # CVE-2024-12345
    title: str
    description: str
    cvss_score: float  # 0-10
    severity: str  # Based on CVSS
    affected_systems: List[str]
    patch_available: bool
    patch_url: Optional[str]
    remediation_steps: str
    linked_incidents: List[Incident]

##Patch Coverage Tracking

Organizations can track which systems have patches applied:

class PatchStatus(Base):
    vulnerability_id: UUID
    system_id: UUID
    is_patched: bool
    patch_date: Optional[datetime]
    verified_by: Optional[UUID]

This enables dashboard calculations like:

def calculate_patch_coverage(organization_id: UUID):
    total_vulnerabilities = db.query(Vulnerability).filter(...).count()
    patched = db.query(PatchStatus).filter(
        PatchStatus.is_patched == True
    ).count()
    return (patched / total_vulnerabilities) * 100

#Network Monitoring - A Game Changer

This was the most ambitious feature: monitoring WiFi networks and DNS activity.

##WiFi Device Detection

@router.post("/network/wifi-config/detect")
async def detect_router(
    router_detection: RouterDetection,
    db: Session = Depends(get_db)
):
    # Auto-detect popular router brands
    detectable_routers = ['Tenda', 'TP-Link', 'UniFi', 'Meraki', 'MikroTik']
    
    for router_type in detectable_routers:
        try:
            result = attempt_connection(router_type, router_detection.ip)
            if result:
                return {
                    "router_type": router_type,
                    "ip": router_detection.ip,
                    "status": "discovered"
                }
        except ConnectionError:
            continue
    
    raise HTTPException(status_code=404, detail="Router not found")

##DNS Logging & Categorization

@router.post("/network/dns-logs/import")
async def import_dns_logs(
    dns_logs: List[DNSLog],
    db: Session = Depends(get_db)
):
    categories = categorize_domains([log.domain for log in dns_logs])
    
    for log, category in zip(dns_logs, categories):
        db_log = DNSLog(
            device_id=log.device_id,
            domain=log.domain,
            category=category,  # social, streaming, work, malware, etc.
            timestamp=log.timestamp,
            blocked=category == "malware"
        )
        db.add(db_log)
    
    db.commit()
    return {"imported": len(dns_logs)}

#Authentication & Multi-Tenancy

##JWT Authentication

def create_access_token(user_id: UUID, organization_id: UUID, expires_delta: timedelta):
    payload = {
        "sub": str(user_id),
        "org": str(organization_id),
        "exp": datetime.utcnow() + expires_delta
    }
    return jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")

##Row-Level Security

PostgreSQL RLS ensures users only see their organization's data:

-- Enable RLS on incidents table

-- Policy: users can only view incidents from their organization CREATE POLICY incidents_org_isolation ON incidents USING (organization_id = (SELECT organization_id FROM users WHERE id = current_user_id())); ```

#Performance & Optimization

##Database Indexing

class Incident(Base):
    __tablename__ = "incidents"
    
    # Strategic indexes for common queries
    __table_args__ = (
        Index('idx_incidents_org_status', 'organization_id', 'status'),
        Index('idx_incidents_severity', 'severity'),
        Index('idx_incidents_created', 'created_at'),
    )

##Caching Strategy

@router.get("/dashboard/metrics")
async def get_dashboard_metrics(
    redis_client: Redis = Depends(get_redis),
    current_user: User = Depends(get_current_user)
):
    cache_key = f"metrics:{current_user.organization_id}"
    
    # Try cache first
    cached = await redis_client.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # Calculate and cache for 5 minutes
    metrics = calculate_metrics(current_user.organization_id)
    await redis_client.setex(cache_key, 300, json.dumps(metrics))
    
    return metrics

#Notifications & Integrations

##Email Alerts

@shared_task
def send_incident_alert(incident_id: str):
    incident = Incident.objects.get(id=incident_id)
    recipients = incident.organization.get_alert_emails()
    
    send_email(
        to=recipients,
        subject=f"[{incident.severity}] {incident.title}",
        template="incident_alert.html",
        context={"incident": incident}
    )

##Slack Integration

@router.post("/incidents")
async def create_incident(incident: IncidentCreate, db: Session = Depends(get_db)):
    new_incident = Incident(**incident.dict())
    db.add(new_incident)
    db.commit()
    
    # Send to Slack
    webhook_url = settings.SLACK_WEBHOOK_URL
    requests.post(webhook_url, json={
        "text": f"🚨 New {incident.severity} incident: {incident.title}"
    })
    
    return new_incident

#Security Best Practices Implemented

1. Password Security: Bcrypt with cost factor 12 2. CORS: Strict origin validation 3. Rate Limiting: 100 requests/minute per IP 4. SQL Injection: All queries use parameterized statements 5. Encryption: TLS in transit, encrypted sensitive fields at rest 6. Audit Logging: Every action logged with user/timestamp 7. Environment Secrets: All secrets managed through .env

#Challenges & Solutions

Challenge 1: Real-time Network Monitoring

Constantly polling WiFi routers would overwhelm them and drain resources.

Solution: Event-driven architecture with scheduled sync tasks:

app.conf.beat_schedule = { 'sync-wifi-devices': { 'task': 'tasks.sync_wifi_devices', 'schedule': crontab(minute='*/5'), # Every 5 minutes }, } ```

Challenge 2: Handling Large DNS Logs

Devices can generate thousands of DNS queries daily.

Solution: Batch import and async processing:

@router.post("/network/dns-logs/import")
async def import_dns_logs_async(dns_logs: List[DNSLog]):
    task = import_dns_logs_task.apply_async(args=[dns_logs])
    return {"task_id": task.id}

Challenge 3: Dashboard Performance

Calculating metrics on 10k+ incidents is slow.

Solution: Pre-calculated and cached metrics:

@shared_task
def refresh_metrics_cache(org_id: str):
    metrics = calculate_all_metrics(org_id)
    redis.setex(f"metrics:{org_id}", 300, metrics)

#Results

  • Load Time: Dashboard loads in less than 2 seconds
  • API Response: Average 150ms for filtered queries
  • Uptime: 99.9% with Render + Supabase
  • Scalability: Handles 100k incidents without degradation

#Key Learnings

1. FastAPI > Flask: Native async support was game-changing for I/O operations 2. RLS is Powerful: PostgreSQL RLS eliminates an entire class of bugs 3. Caching is Essential: Database queries are the bottleneck, not computation 4. Monitoring is Hard: Network monitoring requires careful rate-limiting 5. Security is Ongoing: Every feature needs security audit

#What's Next

  • WebSocket support for real-time incident updates
  • Machine learning for anomaly detection
  • Automated incident response workflows
  • Mobile app for on-call engineers

#Tech Stack Summary

  • Frontend: Next.js 16, React, Tailwind CSS, shadcn/ui, SWR
  • Backend: FastAPI, SQLAlchemy, Pydantic
  • Database: PostgreSQL (Supabase), Redis
  • DevOps: Docker, Vercel, Render
  • Security: JWT, Bcrypt, TLS, Row-Level Security

Building this platform reinforced that security tooling must be: - Reliable: Teams depend on it - Fast: Slow tools get abandoned - Usable: Complex features must have intuitive UX - Scalable: Start small, grow big

The project is open-source on GitHub—check it out and contribute!

Have thoughts on this article? Share them with me on Facebook or GitHub.