Skip to main content
Security & PasswordsDocumented

zero-trust-architect

Zero Trust security architecture. OIDC/OAuth 2.0 with MFA, ABAC policy engines, Kubernetes network policies, Istio mTLS, HashiCorp Vault dynamic secrets, and the five Zero Trust pillars implemented in production.

Share:

Installation

npx clawhub@latest install zero-trust-architect

View the full skill documentation and source below.

Documentation

Zero Trust Architecture

Zero Trust Principles

Traditional Security: "Trust but verify" — trust everyone inside the network
Zero Trust:           "Never trust, always verify" — trust no one, anywhere

Zero Trust Axioms:
1. The network is always hostile (even internal)
2. Threats exist inside and outside the network
3. Every device and user must be authenticated and authorized
4. Access is least-privilege and time-limited
5. Logging and monitoring are mandatory everywhere

The Five Pillars

1. Identity (Who are you?)
   - Strong multi-factor authentication
   - Identity as the new perimeter
   - Continuous identity verification, not just at login

2. Device (What are you using?)
   - Device health validation
   - Managed vs unmanaged device policy
   - Certificate-based device identity

3. Network (Where are you connecting?)
   - Micro-segmentation (not flat network)
   - Encrypt all traffic (even internal)
   - Software-defined perimeter (SDP)

4. Application (What are you accessing?)
   - Application-level access control
   - No network-level access to apps by default
   - Context-aware authorization

5. Data (What data can you touch?)
   - Data classification
   - Rights management
   - Data loss prevention

Identity and Access Management

OIDC/OAuth 2.0 with PKCE (Modern Auth)

# FastAPI with OIDC verification
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2AuthorizationCodeBearer
from jose import jwt, JWTError
import httpx

app = FastAPI()

oauth2_scheme = OAuth2AuthorizationCodeBearer(
    authorizationUrl="https://auth.company.com/oauth2/authorize",
    tokenUrl="https://auth.company.com/oauth2/token",
)

class TokenPayload:
    sub: str        # User ID
    email: str
    groups: list[str]
    device_id: str
    device_trust: str  # "managed" | "unmanaged" | "unknown"
    amr: list[str]     # Authentication methods (e.g., ["pwd", "otp"])
    iat: int
    exp: int

async def get_current_user(token: str = Depends(oauth2_scheme)) -> TokenPayload:
    try:
        # Fetch JWKS (cached in production)
        async with httpx.AsyncClient() as client:
            jwks_response = await client.get("https://auth.company.com/.well-known/jwks.json")
        
        payload = jwt.decode(
            token,
            jwks_response.json(),
            algorithms=["RS256"],
            audience="https://api.company.com",
        )
        
        # Require MFA for sensitive operations
        if "otp" not in payload.get("amr", []) and "hwk" not in payload.get("amr", []):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Multi-factor authentication required"
            )
        
        return TokenPayload(**payload)
        
    except JWTError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Invalid authentication credentials: {e}",
        )

# Policy-based authorization
def require_permission(permission: str):
    async def checker(user: TokenPayload = Depends(get_current_user)):
        if permission not in get_user_permissions(user):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Permission required: {permission}"
            )
        return user
    return checker

@app.get("/admin/users")
async def list_users(user = Depends(require_permission("users:read"))):
    return {"users": []}

Attribute-Based Access Control (ABAC)

from typing import Protocol
from dataclasses import dataclass

@dataclass
class AccessRequest:
    subject: dict   # User attributes: role, department, clearance
    resource: dict  # Resource attributes: classification, owner, type
    action: str     # read, write, delete, admin
    environment: dict  # IP, time, device_trust

class PolicyEngine:
    """ABAC policy engine — evaluate policies, not just roles."""
    
    def __init__(self, policies: list):
        self.policies = policies
    
    def evaluate(self, request: AccessRequest) -> bool:
        """Check all policies — deny by default."""
        for policy in self.policies:
            result = policy.evaluate(request)
            if result == "deny":
                return False  # Explicit deny wins
            if result == "allow":
                return True
        return False  # Default deny

# Example policies
POLICIES = [
    # Deny access from untrusted devices to sensitive data
    {
        "name": "require_managed_device_for_sensitive",
        "condition": lambda r: (
            r.resource.get("classification") == "sensitive"
            and r.environment.get("device_trust") != "managed"
        ),
        "effect": "deny",
    },
    # Allow managers to read reports in their department
    {
        "name": "managers_read_dept_reports",
        "condition": lambda r: (
            r.subject.get("role") == "manager"
            and r.action == "read"
            and r.resource.get("type") == "report"
            and r.resource.get("department") == r.subject.get("department")
        ),
        "effect": "allow",
    },
]

Micro-Segmentation

Kubernetes Network Policies

# Default deny all traffic in namespace
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: default-deny-all
  namespace: production
spec:
  podSelector: {}  # Matches ALL pods
  policyTypes:
    - Ingress
    - Egress

---
# Allow only api → database communication
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-api-to-db
  namespace: production
spec:
  podSelector:
    matchLabels:
      app: postgres
  policyTypes:
    - Ingress
  ingress:
    - from:
        - podSelector:
            matchLabels:
              app: api     # Only api pods can reach postgres
      ports:
        - protocol: TCP
          port: 5432

---
# Allow api to reach external services
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: api-egress
  namespace: production
spec:
  podSelector:
    matchLabels:
      app: api
  policyTypes:
    - Egress
  egress:
    - to:
        - podSelector:
            matchLabels:
              app: postgres
      ports:
        - port: 5432
    - to:
        - podSelector:
            matchLabels:
              app: redis
      ports:
        - port: 6379
    - to:  # Allow DNS
        - namespaceSelector: {}
          podSelector:
            matchLabels:
              k8s-app: kube-dns
      ports:
        - port: 53
          protocol: UDP

mTLS Service Mesh

Istio Configuration

# Enable strict mTLS across namespace
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: default
  namespace: production
spec:
  mtls:
    mode: STRICT  # PERMISSIVE during migration, STRICT in production

---
# Authorization policy — service-to-service
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: postgres-access
  namespace: production
spec:
  selector:
    matchLabels:
      app: postgres
  rules:
    - from:
        - source:
            principals:
              - "cluster.local/ns/production/sa/api-service"  # Only api-service SA
      to:
        - operation:
            ports: ["5432"]

---
# JWT validation at the mesh level
apiVersion: security.istio.io/v1beta1
kind: RequestAuthentication
metadata:
  name: jwt-auth
  namespace: production
spec:
  selector:
    matchLabels:
      app: api
  jwtRules:
    - issuer: "https://auth.company.com"
      jwksUri: "https://auth.company.com/.well-known/jwks.json"
      audiences:
        - "https://api.company.com"
      forwardOriginalToken: true

Secrets Management

HashiCorp Vault Dynamic Secrets

# Vault policy — least privilege
path "database/creds/api-readonly" {
  capabilities = ["read"]  # ONLY this path, read only
}

path "secret/data/api/*" {
  capabilities = ["read", "list"]
}

# Kubernetes auth method
path "auth/kubernetes/login" {
  capabilities = ["create", "update"]
}
import hvac
import os

def get_db_credentials(service_account_token: str, vault_role: str) -> dict:
    """Get short-lived DB credentials from Vault."""
    client = hvac.Client(url="https://vault.internal.company.com")
    
    # Authenticate using Kubernetes service account
    client.auth.kubernetes.login(
        role=vault_role,
        jwt=service_account_token,
    )
    
    # Get dynamic credentials (auto-expires after 1 hour)
    creds = client.secrets.database.get_credentials(name="api-readonly")
    
    return {
        "username": creds["data"]["username"],
        "password": creds["data"]["password"],
        # Credentials auto-expire — renew before lease_duration
        "lease_id": creds["lease_id"],
        "lease_duration": creds["lease_duration"],
    }

Zero Trust Checklist

Identity

  • All users require MFA (phishing-resistant preferred: FIDO2/WebAuthn)
  • Machine identities use certificates, not passwords
  • Service accounts use workload identity (not static API keys)
  • Session tokens expire (max 8 hours)
  • Continuous authentication for sensitive operations

Network

  • Default-deny network policies (allow-list only)
  • mTLS for all service-to-service communication
  • No SSH directly to production (use bastion/SSM)
  • All traffic encrypted (no plaintext HTTP internally)
  • Network microsegmentation per service

Application

  • Input validation at every API boundary
  • Output encoding to prevent injection
  • CSRF protection on state-changing operations
  • Rate limiting per user/IP
  • API authentication on ALL endpoints

Data

  • Data classified by sensitivity
  • Encryption at rest (database-level or application-level)
  • PII data minimization (don't log PII)
  • Audit log for all data access to sensitive fields

Observability

  • Centralized log aggregation
  • Authentication failure alerting
  • Unusual access pattern detection
  • Complete audit trail for data access
  • Incident response playbook tested