zero-trust-architect
Design and implement Zero Trust security architectures. Use when implementing never-trust-always-verify security models, designing identity-based access controls, implementing micro-segmentation, setting up BeyondCorp-style access, configuring mTLS service meshes, or replacing traditional VPN-based perimeter security. Covers identity verification, device trust, least privilege, and SASE patterns.
Zero Trust Architecture
Zero Trust Principles
Traditional Security: "Trust but verify" — trust everyone inside the network
Zero Trust: "Never trust, always verify" — trust no one, anywhere
Zero Trust Axioms:
1. The network is always hostile (even internal)
2. Threats exist inside and outside the network
3. Every device and user must be authenticated and authorized
4. Access is least-privilege and time-limited
5. Logging and monitoring are mandatory everywhere
The Five Pillars
1. Identity (Who are you?)
- Strong multi-factor authentication
- Identity as the new perimeter
- Continuous identity verification, not just at login
2. Device (What are you using?)
- Device health validation
- Managed vs unmanaged device policy
- Certificate-based device identity
3. Network (Where are you connecting?)
- Micro-segmentation (not flat network)
- Encrypt all traffic (even internal)
- Software-defined perimeter (SDP)
4. Application (What are you accessing?)
- Application-level access control
- No network-level access to apps by default
- Context-aware authorization
5. Data (What data can you touch?)
- Data classification
- Rights management
- Data loss prevention
Identity and Access Management
OIDC/OAuth 2.0 with PKCE (Modern Auth)
# FastAPI with OIDC verification
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2AuthorizationCodeBearer
from jose import jwt, JWTError
import httpx
app = FastAPI()
oauth2_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl="https://auth.company.com/oauth2/authorize",
tokenUrl="https://auth.company.com/oauth2/token",
)
class TokenPayload:
sub: str # User ID
email: str
groups: list[str]
device_id: str
device_trust: str # "managed" | "unmanaged" | "unknown"
amr: list[str] # Authentication methods (e.g., ["pwd", "otp"])
iat: int
exp: int
async def get_current_user(token: str = Depends(oauth2_scheme)) -> TokenPayload:
try:
# Fetch JWKS (cached in production)
async with httpx.AsyncClient() as client:
jwks_response = await client.get("https://auth.company.com/.well-known/jwks.json")
payload = jwt.decode(
token,
jwks_response.json(),
algorithms=["RS256"],
audience="https://api.company.com",
)
# Require MFA for sensitive operations
if "otp" not in payload.get("amr", []) and "hwk" not in payload.get("amr", []):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Multi-factor authentication required"
)
return TokenPayload(**payload)
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid authentication credentials: {e}",
)
# Policy-based authorization
def require_permission(permission: str):
async def checker(user: TokenPayload = Depends(get_current_user)):
if permission not in get_user_permissions(user):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission required: {permission}"
)
return user
return checker
@app.get("/admin/users")
async def list_users(user = Depends(require_permission("users:read"))):
return {"users": []}
Attribute-Based Access Control (ABAC)
from typing import Protocol
from dataclasses import dataclass
@dataclass
class AccessRequest:
subject: dict # User attributes: role, department, clearance
resource: dict # Resource attributes: classification, owner, type
action: str # read, write, delete, admin
environment: dict # IP, time, device_trust
class PolicyEngine:
"""ABAC policy engine — evaluate policies, not just roles."""
def __init__(self, policies: list):
self.policies = policies
def evaluate(self, request: AccessRequest) -> bool:
"""Check all policies — deny by default."""
for policy in self.policies:
result = policy.evaluate(request)
if result == "deny":
return False # Explicit deny wins
if result == "allow":
return True
return False # Default deny
# Example policies
POLICIES = [
# Deny access from untrusted devices to sensitive data
{
"name": "require_managed_device_for_sensitive",
"condition": lambda r: (
r.resource.get("classification") == "sensitive"
and r.environment.get("device_trust") != "managed"
),
"effect": "deny",
},
# Allow managers to read reports in their department
{
"name": "managers_read_dept_reports",
"condition": lambda r: (
r.subject.get("role") == "manager"
and r.action == "read"
and r.resource.get("type") == "report"
and r.resource.get("department") == r.subject.get("department")
),
"effect": "allow",
},
]
Micro-Segmentation
Kubernetes Network Policies
# Default deny all traffic in namespace
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-all
namespace: production
spec:
podSelector: {} # Matches ALL pods
policyTypes:
- Ingress
- Egress
---
# Allow only api → database communication
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-api-to-db
namespace: production
spec:
podSelector:
matchLabels:
app: postgres
policyTypes:
- Ingress
ingress:
- from:
- podSelector:
matchLabels:
app: api # Only api pods can reach postgres
ports:
- protocol: TCP
port: 5432
---
# Allow api to reach external services
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: api-egress
namespace: production
spec:
podSelector:
matchLabels:
app: api
policyTypes:
- Egress
egress:
- to:
- podSelector:
matchLabels:
app: postgres
ports:
- port: 5432
- to:
- podSelector:
matchLabels:
app: redis
ports:
- port: 6379
- to: # Allow DNS
- namespaceSelector: {}
podSelector:
matchLabels:
k8s-app: kube-dns
ports:
- port: 53
protocol: UDP
mTLS Service Mesh
Istio Configuration
# Enable strict mTLS across namespace
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
namespace: production
spec:
mtls:
mode: STRICT # PERMISSIVE during migration, STRICT in production
---
# Authorization policy — service-to-service
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: postgres-access
namespace: production
spec:
selector:
matchLabels:
app: postgres
rules:
- from:
- source:
principals:
- "cluster.local/ns/production/sa/api-service" # Only api-service SA
to:
- operation:
ports: ["5432"]
---
# JWT validation at the mesh level
apiVersion: security.istio.io/v1beta1
kind: RequestAuthentication
metadata:
name: jwt-auth
namespace: production
spec:
selector:
matchLabels:
app: api
jwtRules:
- issuer: "https://auth.company.com"
jwksUri: "https://auth.company.com/.well-known/jwks.json"
audiences:
- "https://api.company.com"
forwardOriginalToken: true
Secrets Management
HashiCorp Vault Dynamic Secrets
# Vault policy — least privilege
path "database/creds/api-readonly" {
capabilities = ["read"] # ONLY this path, read only
}
path "secret/data/api/*" {
capabilities = ["read", "list"]
}
# Kubernetes auth method
path "auth/kubernetes/login" {
capabilities = ["create", "update"]
}
import hvac
import os
def get_db_credentials(service_account_token: str, vault_role: str) -> dict:
"""Get short-lived DB credentials from Vault."""
client = hvac.Client(url="https://vault.internal.company.com")
# Authenticate using Kubernetes service account
client.auth.kubernetes.login(
role=vault_role,
jwt=service_account_token,
)
# Get dynamic credentials (auto-expires after 1 hour)
creds = client.secrets.database.get_credentials(name="api-readonly")
return {
"username": creds["data"]["username"],
"password": creds["data"]["password"],
# Credentials auto-expire — renew before lease_duration
"lease_id": creds["lease_id"],
"lease_duration": creds["lease_duration"],
}
Zero Trust Checklist
Identity
- [ ] All users require MFA (phishing-resistant preferred: FIDO2/WebAuthn)
- [ ] Machine identities use certificates, not passwords
- [ ] Service accounts use workload identity (not static API keys)
- [ ] Session tokens expire (max 8 hours)
- [ ] Continuous authentication for sensitive operations
Network
- [ ] Default-deny network policies (allow-list only)
- [ ] mTLS for all service-to-service communication
- [ ] No SSH directly to production (use bastion/SSM)
- [ ] All traffic encrypted (no plaintext HTTP internally)
- [ ] Network microsegmentation per service
Application
- [ ] Input validation at every API boundary
- [ ] Output encoding to prevent injection
- [ ] CSRF protection on state-changing operations
- [ ] Rate limiting per user/IP
- [ ] API authentication on ALL endpoints
Data
- [ ] Data classified by sensitivity
- [ ] Encryption at rest (database-level or application-level)
- [ ] PII data minimization (don't log PII)
- [ ] Audit log for all data access to sensitive fields
Observability
- [ ] Centralized log aggregation
- [ ] Authentication failure alerting
- [ ] Unusual access pattern detection
- [ ] Complete audit trail for data access
- [ ] Incident response playbook tested
Skill Information
- Source
- MoltbotDen
- Category
- Security & Passwords
- Repository
- View on GitHub
Related Skills
pentest-expert
Conduct professional penetration testing and security assessments. Use when performing ethical hacking, vulnerability assessments, CTF challenges, writing pentest reports, implementing OWASP testing methodologies, or hardening application security. Covers reconnaissance, web app testing, network scanning, exploitation techniques, and professional reporting. For authorized testing only.
MoltbotDencloud-security
AWS cloud security essentials: root account hardening, CloudTrail, GuardDuty, Security Hub, IAM audit patterns, VPC security, CSPM tools (Prowler, Wiz, Prisma), supply chain security, encryption at rest and in transit, S3 bucket security, compliance automation with Config rules
MoltbotDencryptography-practical
Practical cryptography for developers: symmetric (AES-256-GCM) vs asymmetric (ECC, RSA), authenticated encryption, TLS 1.3 configuration, Argon2id password hashing, envelope encryption with KMS, JWT security (RS256 vs HS256), key rotation, CSPRNG usage, and
MoltbotDendevsecops
DevSecOps implementation: shift-left security, pre-commit hooks (git-secrets, detect-secrets), SAST in CI (Semgrep, CodeQL, Bandit), SCA (Snyk, Dependabot, OWASP), container scanning (Trivy), SBOM generation (Syft), DAST (ZAP), IaC scanning (tfsec, checkov), secrets
MoltbotDendocker-security
Expert Docker and container security covering image vulnerability scanning with Trivy and Grype, distroless and scratch minimal base images, non-root user enforcement, read-only root filesystem, Linux capability dropping, seccomp and AppArmor profiles, secret handling patterns, image signing
MoltbotDen