Security & PasswordsDocumented
zero-trust-architect
Zero Trust security architecture. OIDC/OAuth 2.0 with MFA, ABAC policy engines, Kubernetes network policies, Istio mTLS, HashiCorp Vault dynamic secrets, and the five Zero Trust pillars implemented in production.
Share:
Installation
npx clawhub@latest install zero-trust-architectView the full skill documentation and source below.
Documentation
Zero Trust Architecture
Zero Trust Principles
Traditional Security: "Trust but verify" — trust everyone inside the network
Zero Trust: "Never trust, always verify" — trust no one, anywhere
Zero Trust Axioms:
1. The network is always hostile (even internal)
2. Threats exist inside and outside the network
3. Every device and user must be authenticated and authorized
4. Access is least-privilege and time-limited
5. Logging and monitoring are mandatory everywhere
The Five Pillars
1. Identity (Who are you?)
- Strong multi-factor authentication
- Identity as the new perimeter
- Continuous identity verification, not just at login
2. Device (What are you using?)
- Device health validation
- Managed vs unmanaged device policy
- Certificate-based device identity
3. Network (Where are you connecting?)
- Micro-segmentation (not flat network)
- Encrypt all traffic (even internal)
- Software-defined perimeter (SDP)
4. Application (What are you accessing?)
- Application-level access control
- No network-level access to apps by default
- Context-aware authorization
5. Data (What data can you touch?)
- Data classification
- Rights management
- Data loss prevention
Identity and Access Management
OIDC/OAuth 2.0 with PKCE (Modern Auth)
# FastAPI with OIDC verification
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2AuthorizationCodeBearer
from jose import jwt, JWTError
import httpx
app = FastAPI()
oauth2_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl="https://auth.company.com/oauth2/authorize",
tokenUrl="https://auth.company.com/oauth2/token",
)
class TokenPayload:
sub: str # User ID
email: str
groups: list[str]
device_id: str
device_trust: str # "managed" | "unmanaged" | "unknown"
amr: list[str] # Authentication methods (e.g., ["pwd", "otp"])
iat: int
exp: int
async def get_current_user(token: str = Depends(oauth2_scheme)) -> TokenPayload:
try:
# Fetch JWKS (cached in production)
async with httpx.AsyncClient() as client:
jwks_response = await client.get("https://auth.company.com/.well-known/jwks.json")
payload = jwt.decode(
token,
jwks_response.json(),
algorithms=["RS256"],
audience="https://api.company.com",
)
# Require MFA for sensitive operations
if "otp" not in payload.get("amr", []) and "hwk" not in payload.get("amr", []):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Multi-factor authentication required"
)
return TokenPayload(**payload)
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid authentication credentials: {e}",
)
# Policy-based authorization
def require_permission(permission: str):
async def checker(user: TokenPayload = Depends(get_current_user)):
if permission not in get_user_permissions(user):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission required: {permission}"
)
return user
return checker
@app.get("/admin/users")
async def list_users(user = Depends(require_permission("users:read"))):
return {"users": []}
Attribute-Based Access Control (ABAC)
from typing import Protocol
from dataclasses import dataclass
@dataclass
class AccessRequest:
subject: dict # User attributes: role, department, clearance
resource: dict # Resource attributes: classification, owner, type
action: str # read, write, delete, admin
environment: dict # IP, time, device_trust
class PolicyEngine:
"""ABAC policy engine — evaluate policies, not just roles."""
def __init__(self, policies: list):
self.policies = policies
def evaluate(self, request: AccessRequest) -> bool:
"""Check all policies — deny by default."""
for policy in self.policies:
result = policy.evaluate(request)
if result == "deny":
return False # Explicit deny wins
if result == "allow":
return True
return False # Default deny
# Example policies
POLICIES = [
# Deny access from untrusted devices to sensitive data
{
"name": "require_managed_device_for_sensitive",
"condition": lambda r: (
r.resource.get("classification") == "sensitive"
and r.environment.get("device_trust") != "managed"
),
"effect": "deny",
},
# Allow managers to read reports in their department
{
"name": "managers_read_dept_reports",
"condition": lambda r: (
r.subject.get("role") == "manager"
and r.action == "read"
and r.resource.get("type") == "report"
and r.resource.get("department") == r.subject.get("department")
),
"effect": "allow",
},
]
Micro-Segmentation
Kubernetes Network Policies
# Default deny all traffic in namespace
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-all
namespace: production
spec:
podSelector: {} # Matches ALL pods
policyTypes:
- Ingress
- Egress
---
# Allow only api → database communication
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-api-to-db
namespace: production
spec:
podSelector:
matchLabels:
app: postgres
policyTypes:
- Ingress
ingress:
- from:
- podSelector:
matchLabels:
app: api # Only api pods can reach postgres
ports:
- protocol: TCP
port: 5432
---
# Allow api to reach external services
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: api-egress
namespace: production
spec:
podSelector:
matchLabels:
app: api
policyTypes:
- Egress
egress:
- to:
- podSelector:
matchLabels:
app: postgres
ports:
- port: 5432
- to:
- podSelector:
matchLabels:
app: redis
ports:
- port: 6379
- to: # Allow DNS
- namespaceSelector: {}
podSelector:
matchLabels:
k8s-app: kube-dns
ports:
- port: 53
protocol: UDP
mTLS Service Mesh
Istio Configuration
# Enable strict mTLS across namespace
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
namespace: production
spec:
mtls:
mode: STRICT # PERMISSIVE during migration, STRICT in production
---
# Authorization policy — service-to-service
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: postgres-access
namespace: production
spec:
selector:
matchLabels:
app: postgres
rules:
- from:
- source:
principals:
- "cluster.local/ns/production/sa/api-service" # Only api-service SA
to:
- operation:
ports: ["5432"]
---
# JWT validation at the mesh level
apiVersion: security.istio.io/v1beta1
kind: RequestAuthentication
metadata:
name: jwt-auth
namespace: production
spec:
selector:
matchLabels:
app: api
jwtRules:
- issuer: "https://auth.company.com"
jwksUri: "https://auth.company.com/.well-known/jwks.json"
audiences:
- "https://api.company.com"
forwardOriginalToken: true
Secrets Management
HashiCorp Vault Dynamic Secrets
# Vault policy — least privilege
path "database/creds/api-readonly" {
capabilities = ["read"] # ONLY this path, read only
}
path "secret/data/api/*" {
capabilities = ["read", "list"]
}
# Kubernetes auth method
path "auth/kubernetes/login" {
capabilities = ["create", "update"]
}
import hvac
import os
def get_db_credentials(service_account_token: str, vault_role: str) -> dict:
"""Get short-lived DB credentials from Vault."""
client = hvac.Client(url="https://vault.internal.company.com")
# Authenticate using Kubernetes service account
client.auth.kubernetes.login(
role=vault_role,
jwt=service_account_token,
)
# Get dynamic credentials (auto-expires after 1 hour)
creds = client.secrets.database.get_credentials(name="api-readonly")
return {
"username": creds["data"]["username"],
"password": creds["data"]["password"],
# Credentials auto-expire — renew before lease_duration
"lease_id": creds["lease_id"],
"lease_duration": creds["lease_duration"],
}
Zero Trust Checklist
Identity
- ○All users require MFA (phishing-resistant preferred: FIDO2/WebAuthn)
- ○Machine identities use certificates, not passwords
- ○Service accounts use workload identity (not static API keys)
- ○Session tokens expire (max 8 hours)
- ○Continuous authentication for sensitive operations
Network
- ○Default-deny network policies (allow-list only)
- ○mTLS for all service-to-service communication
- ○No SSH directly to production (use bastion/SSM)
- ○All traffic encrypted (no plaintext HTTP internally)
- ○Network microsegmentation per service
Application
- ○Input validation at every API boundary
- ○Output encoding to prevent injection
- ○CSRF protection on state-changing operations
- ○Rate limiting per user/IP
- ○API authentication on ALL endpoints
Data
- ○Data classified by sensitivity
- ○Encryption at rest (database-level or application-level)
- ○PII data minimization (don't log PII)
- ○Audit log for all data access to sensitive fields
Observability
- ○Centralized log aggregation
- ○Authentication failure alerting
- ○Unusual access pattern detection
- ○Complete audit trail for data access
- ○Incident response playbook tested