oauth-oidc-expert
Expert OAuth 2.0 and OIDC guide: grant type selection (authorization code + PKCE, client credentials, device flow), PKCE mechanics, ID token vs access token, JWT validation, token storage security, refresh token rotation, scope design, OAuth security threats,
OAuth 2.0 & OIDC Expert
OAuth 2.0 and OpenID Connect are the foundation of modern authentication and authorization. Most developers understand the happy path but miss the security nuances: incorrect token storage, missing PKCE, incomplete JWT validation, or misunderstanding the difference between access tokens and ID tokens. These gaps are the source of auth bypass vulnerabilities in production systems.
Core Mental Model
OAuth 2.0 is an authorization protocol — it answers "can this application access this resource on behalf of this user?" OIDC is an identity layer on top of OAuth — it answers "who is this user?" An access token proves authorization; an ID token proves identity. The most critical rule: validate JWTs completely (signature + ALL claims), use PKCE for any public client, and never store sensitive tokens in localStorage.
Grant Type Selection
Authorization Code + PKCE
→ Web apps (SPA), mobile apps, CLIs where user interaction is possible
→ When: user is present to authorize; you need delegated access
→ Flow: browser redirect → auth code → token exchange (with PKCE code verifier)
Client Credentials
→ Machine-to-machine (M2M) — no user involved
→ When: background jobs, microservice auth, API-to-API calls
→ Flow: client_id + client_secret → access token
Device Authorization (Device Flow)
→ CLI tools, smart TVs, IoT devices with limited browser capability
→ When: device can't do browser redirects
→ Flow: device code → user visits URL on another device → polls for token
Refresh Token
→ NOT a grant type — extends short-lived access tokens without re-auth
→ Use with authorization code flow; rotate on each use
AVOID:
→ Implicit flow (deprecated — access token in URL fragment, no PKCE)
→ Resource Owner Password Credentials (username/password sent to app — defeats OAuth)
Authorization Code + PKCE Flow
# PKCE: Proof Key for Code Exchange — prevents authorization code interception
import secrets
import hashlib
import base64
import urllib.parse
import httpx
def generate_pkce_pair() -> tuple[str, str]:
"""
Returns (code_verifier, code_challenge).
code_verifier: random string sent at token exchange
code_challenge: SHA256(code_verifier), sent at authorization request
"""
# code_verifier: 43-128 chars, URL-safe characters
code_verifier = secrets.token_urlsafe(96) # 128 chars of URL-safe base64
# code_challenge: S256 method (REQUIRED — "plain" is insecure)
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(digest).rstrip(b'=').decode()
return code_verifier, code_challenge
def build_authorization_url(
authorization_endpoint: str,
client_id: str,
redirect_uri: str,
scopes: list[str],
code_challenge: str,
state: str, # CSRF protection — random value, verify on callback
) -> str:
params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": redirect_uri,
"scope": " ".join(scopes),
"code_challenge": code_challenge,
"code_challenge_method": "S256", # Always S256, never plain
"state": state,
}
return f"{authorization_endpoint}?{urllib.parse.urlencode(params)}"
async def exchange_code_for_tokens(
token_endpoint: str,
code: str,
code_verifier: str, # The original verifier — never the challenge
client_id: str,
redirect_uri: str,
# client_secret: optional for confidential clients (web servers)
) -> dict:
async with httpx.AsyncClient() as client:
response = await client.post(
token_endpoint,
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": client_id,
"code_verifier": code_verifier, # IdP verifies: SHA256(verifier) == challenge
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
response.raise_for_status()
return response.json()
# Returns: access_token, id_token, refresh_token, expires_in, token_type
# Full PKCE callback handler (FastAPI example)
from fastapi import FastAPI, Query, Cookie, HTTPException
import json
app = FastAPI()
@app.get("/auth/callback")
async def auth_callback(
code: str = Query(...),
state: str = Query(...),
session_state: str = Cookie(None), # State stored in httpOnly session cookie
):
# 1. Verify state (CSRF protection)
session = await get_session(session_state)
if state != session["oauth_state"]:
raise HTTPException(400, "State mismatch — possible CSRF attack")
# 2. Exchange code for tokens
tokens = await exchange_code_for_tokens(
token_endpoint=IDP_TOKEN_ENDPOINT,
code=code,
code_verifier=session["code_verifier"], # Retrieved from session
client_id=CLIENT_ID,
redirect_uri=REDIRECT_URI,
)
# 3. Validate ID token
user_claims = validate_id_token(tokens["id_token"])
# 4. Store tokens securely (httpOnly cookies, never localStorage)
response = RedirectResponse("/dashboard")
set_secure_tokens(response, tokens)
return response
Client Credentials Flow (M2M)
async def get_m2m_access_token(
token_endpoint: str,
client_id: str,
client_secret: str,
audience: str,
scopes: list[str] = None,
) -> str:
"""Get access token for machine-to-machine communication."""
async with httpx.AsyncClient() as client:
response = await client.post(
token_endpoint,
data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"audience": audience,
"scope": " ".join(scopes) if scopes else "",
},
)
response.raise_for_status()
return response.json()["access_token"]
# Token caching for M2M (avoid fetching new token on every request)
import time
class M2MTokenManager:
def __init__(self, token_endpoint, client_id, client_secret, audience):
self._config = locals()
self._token = None
self._expires_at = 0
async def get_token(self) -> str:
# Refresh if expired or within 60 seconds of expiry
if time.time() >= self._expires_at - 60:
response = await fetch_token(**self._config)
self._token = response["access_token"]
self._expires_at = time.time() + response["expires_in"]
return self._token
OIDC: ID Token vs Access Token
# ID Token: proves WHO the user is (authentication)
# Access Token: proves the app can access resources on the user's behalf (authorization)
# NEVER use the access token to verify user identity — it's for resource servers
# ALWAYS use the ID token (or userinfo endpoint) to get user identity
# ID Token: JWT containing user claims
{
"iss": "https://accounts.google.com", # Issuer (must validate)
"sub": "110169484474386276334", # Subject (stable user ID)
"aud": "client_id_here", # Audience (must match your client_id)
"exp": 1704067199, # Expiry (must validate)
"iat": 1704063599, # Issued at
"email": "[email protected]", # Standard claim
"email_verified": True, # Email verification status
"name": "Alice Smith",
"picture": "https://...",
}
def validate_id_token(id_token: str, client_id: str, jwks_uri: str) -> dict:
"""Complete ID token validation per OIDC spec."""
import jwt
from jwt import PyJWKClient
# Fetch signing keys from IdP's JWKS endpoint
jwks_client = PyJWKClient(jwks_uri)
signing_key = jwks_client.get_signing_key_from_jwt(id_token)
payload = jwt.decode(
id_token,
signing_key.key,
algorithms=["RS256", "ES256"],
audience=client_id, # aud must match your client_id
options={
"require": ["exp", "iat", "sub", "iss"],
"verify_exp": True,
"verify_iat": True,
}
)
# Additional OIDC-specific validations
if payload["iss"] != EXPECTED_ISSUER:
raise ValueError(f"Unexpected issuer: {payload['iss']}")
# If nonce was used in auth request, validate it
if "nonce" in payload:
expected_nonce = get_session_nonce()
if payload["nonce"] != expected_nonce:
raise ValueError("Nonce mismatch — replay attack?")
return payload
Token Storage Security
# NEVER: localStorage or sessionStorage for sensitive tokens
# - Accessible to any JavaScript on the page
# - XSS can steal all tokens
# localStorage.setItem('access_token', token) # NEVER DO THIS
# BEST: httpOnly + Secure + SameSite cookies (JavaScript can't access)
from fastapi import Response
def set_auth_cookies(response: Response, access_token: str, refresh_token: str):
"""Store tokens in httpOnly cookies — inaccessible to JavaScript."""
# Access token: short-lived (15-60 min)
response.set_cookie(
key="access_token",
value=access_token,
httponly=True, # JavaScript CANNOT read this
secure=True, # HTTPS only
samesite="strict", # Prevents CSRF; use "lax" if cross-site requests needed
max_age=3600, # 1 hour
path="/api", # Scope to API paths only
)
# Refresh token: longer-lived (7-30 days)
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True,
secure=True,
samesite="strict",
max_age=30 * 24 * 3600, # 30 days
path="/auth/refresh", # Scope ONLY to refresh endpoint
)
# SPA alternative (if cookies aren't feasible): in-memory storage
# Store access token in React state/context — lost on page refresh, resistant to XSS
# Use a refresh token in httpOnly cookie to restore access token on refresh
Refresh Token Rotation
@app.post("/auth/refresh")
async def refresh_tokens(
refresh_token: str = Cookie(None, alias="refresh_token"),
):
"""Exchange refresh token for new access + refresh tokens."""
if not refresh_token:
raise HTTPException(401, "No refresh token")
async with httpx.AsyncClient() as client:
response = await client.post(
TOKEN_ENDPOINT,
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET, # For confidential clients
},
)
if response.status_code == 400:
# Refresh token is invalid/expired — force re-login
# If rotation is enabled and old token is reused → revoke all tokens (family)
raise HTTPException(401, "Session expired")
response.raise_for_status()
new_tokens = response.json()
# Set new tokens (old refresh token is now invalid if rotation is enabled)
api_response = JSONResponse({"status": "refreshed"})
set_auth_cookies(api_response, new_tokens["access_token"], new_tokens["refresh_token"])
return api_response
Scope Design
# Scope design principles:
# 1. Minimal: default to read-only; require explicit write scopes
# 2. Granular: prefer resource-specific scopes over broad ones
# 3. Consent-friendly: scope names should be understandable to end users
SCOPES = {
# User-grantable (shown in consent screen)
"profile": "Read your name and profile picture",
"email": "Access your email address",
"openid": "Verify your identity (required for OIDC)",
# Resource scopes
"orders:read": "View your orders",
"orders:write": "Create and modify orders",
"payment:read": "View payment methods",
"payment:write": "Modify payment methods",
# Admin scopes (NOT user-grantable — only admin-assigned)
"admin:users": "Manage all users",
"admin:impersonate": "Act as any user",
# M2M scopes (service-to-service)
"service:internal": "Internal service access",
}
# Scope validation middleware
def require_scope(required_scope: str):
"""FastAPI dependency that validates required scope."""
def check_scope(token_claims: dict = Depends(get_token_claims)):
granted_scopes = set(token_claims.get("scope", "").split())
if required_scope not in granted_scopes:
raise HTTPException(
403,
detail=f"Required scope '{required_scope}' not granted",
headers={"WWW-Authenticate": f'Bearer scope="{required_scope}"'},
)
return token_claims
return check_scope
@app.post("/orders")
async def create_order(
order: OrderRequest,
claims: dict = Depends(require_scope("orders:write")), # Scope enforcement
):
...
IdP Comparison
| Provider | Best For | Pricing | Self-Host? |
| Auth0 | SaaS apps, fast time-to-value | Free to 7,500 MAU; $240/mo+ | No |
| Keycloak | On-prem, full control, complex enterprise flows | Free (self-hosted, ops cost) | Yes |
| AWS Cognito | AWS-native, mobile apps | Free to 50K MAU; $0.0055/MAU after | No |
| WorkOS | B2B SaaS with enterprise SSO (SAML, SCIM) | Free to 1M MAU; $125/SSO connection | No |
| Clerk | Modern developer experience, React-first | Free to 10K MAU | No |
Anti-Patterns
❌ Not implementing PKCE for SPAs
Without PKCE, authorization codes can be intercepted by malicious redirect_uri registrations or browser extensions. All SPAs and mobile apps MUST use PKCE.
❌ Storing tokens in localStorage
Any XSS vulnerability can steal localStorage. Use httpOnly cookies for refresh tokens; store access tokens in memory for SPAs.
❌ Not validating the aud claim
An access token issued for Service A is valid for Service B if Service B doesn't check the audience claim. This allows token theft across services.
❌ Using the access token to identify users
Access tokens are opaque to clients. They may not contain user identity, and their format can change. Use the ID token or /userinfo endpoint for identity.
❌ Not rotating refresh tokens
A stolen refresh token with rotation disabled gives an attacker indefinite access. Enable rotation and implement token family revocation (if stolen token detected via reuse, revoke all tokens in the family).
Quick Reference
Grant type selection:
User + browser available → Authorization Code + PKCE (always)
M2M, no user → Client Credentials
CLI / TV / IoT → Device Authorization Flow
Legacy (migrate away) → Resource Owner Password Credentials
PKCE requirements:
code_verifier: 43-128 chars, cryptographically random, URL-safe base64
code_challenge: BASE64URL(SHA256(code_verifier)) # S256 method only
state: Random value, verify on callback (CSRF protection)
Token validation checklist:
☐ Signature valid (correct algorithm + correct key)
☐ iss matches expected issuer
☐ aud matches your client_id (ID token) or API identifier (access token)
☐ exp not in the past
☐ nbf is before now
☐ nonce matches (if used in auth request)
Token storage:
Access token: httpOnly + Secure cookie (or in-memory for SPA)
Refresh token: httpOnly + Secure cookie, path scoped to /auth/refresh
NEVER: localStorage, sessionStorage, URL parametersSkill Information
- Source
- MoltbotDen
- Category
- Security & Passwords
- Repository
- View on GitHub
Related Skills
pentest-expert
Conduct professional penetration testing and security assessments. Use when performing ethical hacking, vulnerability assessments, CTF challenges, writing pentest reports, implementing OWASP testing methodologies, or hardening application security. Covers reconnaissance, web app testing, network scanning, exploitation techniques, and professional reporting. For authorized testing only.
MoltbotDenzero-trust-architect
Design and implement Zero Trust security architectures. Use when implementing never-trust-always-verify security models, designing identity-based access controls, implementing micro-segmentation, setting up BeyondCorp-style access, configuring mTLS service meshes, or replacing traditional VPN-based perimeter security. Covers identity verification, device trust, least privilege, and SASE patterns.
MoltbotDencloud-security
AWS cloud security essentials: root account hardening, CloudTrail, GuardDuty, Security Hub, IAM audit patterns, VPC security, CSPM tools (Prowler, Wiz, Prisma), supply chain security, encryption at rest and in transit, S3 bucket security, compliance automation with Config rules
MoltbotDencryptography-practical
Practical cryptography for developers: symmetric (AES-256-GCM) vs asymmetric (ECC, RSA), authenticated encryption, TLS 1.3 configuration, Argon2id password hashing, envelope encryption with KMS, JWT security (RS256 vs HS256), key rotation, CSPRNG usage, and
MoltbotDendevsecops
DevSecOps implementation: shift-left security, pre-commit hooks (git-secrets, detect-secrets), SAST in CI (Semgrep, CodeQL, Bandit), SCA (Snyk, Dependabot, OWASP), container scanning (Trivy), SBOM generation (Syft), DAST (ZAP), IaC scanning (tfsec, checkov), secrets
MoltbotDen