PyTorch Refresher
Tensors, autograd, neural networks, training loops, and model deployment
Table of Contents
Setup & Environment
PyTorch is best installed into a virtual environment. The same pip install torch torchvision command works on all platforms — PyTorch's wheel index handles CUDA/CPU/MPS selection automatically.
# Create and activate a virtual environment
python3 -m venv .venv
source .venv/bin/activate # macOS / Linux
# .venv\Scripts\activate # Windows
# CPU-only or Mac MPS (works on both; MPS used automatically on Apple Silicon)
pip install torch torchvision
# If you need CUDA (Linux/Windows with NVIDIA GPU) — pick your CUDA version:
# pip install torch torchvision --index-url https://download.pytorch.org/whl/cu121
# Useful extras
pip install jupyter matplotlib numpy
Verify the installation
import torch
print(torch.__version__) # e.g. "2.2.1"
print(torch.cuda.is_available()) # True on NVIDIA GPU
print(torch.backends.mps.is_available()) # True on Apple Silicon
# Pick the best available device — write this once, reuse everywhere
device = (
"cuda" if torch.cuda.is_available() else
"mps" if torch.backends.mps.is_available() else
"cpu"
)
print(f"Using device: {device}")
device = torch.device("mps") for GPU-accelerated training without any CUDA setup. MPS is 3-10x faster than CPU for typical neural net workloads.
Docker alternative
# Official PyTorch image — comes with CUDA, Jupyter, and common libraries
docker run -it --rm \
-p 8888:8888 \
pytorch/pytorch:latest \
jupyter notebook --ip=0.0.0.0 --no-browser --allow-root
# CPU-only for quick experiments
docker run -it --rm pytorch/pytorch:latest python
Tensors
A tensor is PyTorch's fundamental data structure — an n-dimensional array that can live on CPU, GPU (CUDA), or Apple Silicon GPU (MPS). Tensors support automatic differentiation, making them the building block for every neural network.
Creating tensors
import torch
import numpy as np
# From Python data
t = torch.tensor([1.0, 2.0, 3.0]) # 1-D float32
m = torch.tensor([[1, 2], [3, 4]]) # 2-D int64
# Factory functions
zeros = torch.zeros(3, 4) # shape (3, 4), all 0.0
ones = torch.ones(2, 3) # shape (2, 3), all 1.0
full = torch.full((2, 3), fill_value=7) # shape (2, 3), all 7
eye = torch.eye(4) # 4x4 identity matrix
rand = torch.rand(3, 4) # uniform [0, 1)
randn = torch.randn(3, 4) # standard normal N(0,1)
randint = torch.randint(0, 10, (3, 4)) # ints in [0, 10)
# Sequences
arange = torch.arange(0, 10, step=2) # [0, 2, 4, 6, 8]
linspace= torch.linspace(0, 1, steps=5) # [0.0, 0.25, 0.5, 0.75, 1.0]
# From NumPy — shares memory (zero-copy) when on CPU
np_arr = np.array([1.0, 2.0, 3.0])
t_from_np = torch.from_numpy(np_arr)
np_from_t = t_from_np.numpy() # back to NumPy
# Copy an existing tensor's shape/dtype without data
like_zeros = torch.zeros_like(rand)
like_ones = torch.ones_like(rand)
Data types (dtypes)
# Default dtype for floating-point literals is float32
t32 = torch.tensor([1.0, 2.0]) # dtype=torch.float32
t64 = torch.tensor([1.0, 2.0], dtype=torch.float64)
t16 = torch.tensor([1.0, 2.0], dtype=torch.float16) # half precision
# Integer types
i32 = torch.tensor([1, 2], dtype=torch.int32)
i64 = torch.tensor([1, 2], dtype=torch.int64) # default for int literals
bool_t = torch.tensor([True, False]) # dtype=torch.bool
# Cast between types
f32 = t64.float() # alias for .to(torch.float32)
i64 = t32.long() # alias for .to(torch.int64)
t = t32.to(torch.float16)
# Query properties
print(t.dtype) # torch.float32
print(t.shape) # torch.Size([2])
print(t.ndim) # 1
print(t.numel()) # 2 — total number of elements
Shape manipulation
t = torch.randn(2, 3, 4) # shape: (2, 3, 4), 24 elements
# view — fast reshape (requires contiguous memory)
flat = t.view(24) # (24,)
mat = t.view(6, 4) # (6, 4)
mat2 = t.view(2, -1) # (2, 12) — -1 inferred
# reshape — like view but works on non-contiguous tensors too
r = t.reshape(4, 6)
# Adding/removing dimensions of size 1
t2d = torch.randn(3, 4)
t3d = t2d.unsqueeze(0) # (1, 3, 4) — prepend dim
t3d = t2d.unsqueeze(-1) # (3, 4, 1) — append dim
t2d_back = t3d.squeeze(-1) # (3, 4) — remove size-1 dim
t2d_back = t3d.squeeze() # removes ALL size-1 dims
# Permute dimensions (like numpy transpose)
t = torch.randn(2, 3, 4)
t_perm = t.permute(0, 2, 1) # (2, 4, 3)
# Transpose (only 2 dims at a time)
mat = torch.randn(3, 4)
mat_T = mat.T # (4, 3)
mat_T = mat.transpose(0, 1) # equivalent
# Stacking and concatenating
a = torch.randn(3, 4)
b = torch.randn(3, 4)
cat_0 = torch.cat([a, b], dim=0) # (6, 4) — concatenate along dim 0
cat_1 = torch.cat([a, b], dim=1) # (3, 8) — concatenate along dim 1
stack = torch.stack([a, b], dim=0) # (2, 3, 4) — new dimension
stack = torch.stack([a, b], dim=1) # (3, 2, 4)
Indexing and slicing
t = torch.arange(24).reshape(4, 6) # shape (4, 6)
# Basic indexing — same as NumPy
print(t[0]) # first row, shape (6,)
print(t[0, 2]) # row 0, col 2 — scalar
print(t[-1]) # last row
# Slicing
print(t[1:3]) # rows 1 and 2, shape (2, 6)
print(t[:, 2:5]) # all rows, cols 2-4, shape (4, 3)
print(t[::2, ::2]) # every other row and col
# Boolean masking
mask = t > 10
print(t[mask]) # 1-D tensor of values > 10
# Fancy (integer) indexing
idx = torch.tensor([0, 2, 3])
print(t[idx]) # rows 0, 2, 3 — shape (3, 6)
# Setting values
t[0] = 99 # assign scalar to entire row
t[t < 5] = 0 # zero out values less than 5
Device placement
device = torch.device("mps") # or "cuda", "cpu"
# Create directly on device
t = torch.randn(3, 4, device=device)
# Move existing tensor
t_cpu = torch.randn(3, 4)
t_gpu = t_cpu.to(device) # preferred — works with any device string
t_gpu = t_cpu.cuda() # CUDA-specific shorthand
t_cpu = t_gpu.cpu() # move back to CPU
# Checking device
print(t_gpu.device) # device(type='mps', index=0)
# IMPORTANT: operations require both tensors on the same device
a = torch.randn(3, device="cpu")
b = torch.randn(3, device=device)
# c = a + b # RuntimeError! Must move first:
c = a.to(device) + b
Tensor Operations
Element-wise operations
a = torch.tensor([1.0, 2.0, 3.0])
b = torch.tensor([4.0, 5.0, 6.0])
# Arithmetic — all element-wise
print(a + b) # tensor([5., 7., 9.])
print(a - b) # tensor([-3., -3., -3.])
print(a * b) # tensor([ 4., 10., 18.])
print(a / b) # tensor([0.25, 0.4, 0.5])
print(a ** 2) # tensor([1., 4., 9.])
# Math functions
print(torch.sqrt(a))
print(torch.exp(a))
print(torch.log(a))
print(torch.abs(torch.tensor([-1.0, 2.0, -3.0])))
print(torch.clamp(a, min=1.5, max=2.5)) # clip values
# Comparison (returns bool tensor)
print(a > 1.5) # tensor([False, True, True])
print(a == b) # tensor([False, False, False])
Matrix multiplication
A = torch.randn(3, 4)
B = torch.randn(4, 5)
# Three equivalent ways to do matrix multiplication:
C1 = A @ B # Python operator (preferred, most readable)
C2 = torch.matmul(A, B) # explicit function
C3 = torch.mm(A, B) # 2-D only, no broadcasting
print(C1.shape) # torch.Size([3, 5])
# Batched matrix multiplication
A_batch = torch.randn(10, 3, 4) # batch of 10 matrices
B_batch = torch.randn(10, 4, 5)
C_batch = A_batch @ B_batch # shape (10, 3, 5)
# Dot product (1-D vectors)
v1 = torch.tensor([1.0, 2.0, 3.0])
v2 = torch.tensor([4.0, 5.0, 6.0])
dot = torch.dot(v1, v2) # scalar: 32.0
# Element-wise matrix multiply (Hadamard product)
A = torch.randn(3, 4)
B = torch.randn(3, 4)
hadamard = A * B # shape (3, 4)
Broadcasting
Broadcasting allows operations on tensors with different shapes by implicitly expanding smaller tensors. The rules follow NumPy: dimensions are aligned from the right and expanded if they are 1 or missing.
# Shape (3,) + scalar
t = torch.tensor([1.0, 2.0, 3.0])
print(t + 10) # tensor([11., 12., 13.])
# Shape (3, 1) + shape (3,) → shape (3, 3)
col = torch.tensor([[1.0], [2.0], [3.0]]) # (3, 1)
row = torch.tensor([10.0, 20.0, 30.0]) # (3,) treated as (1, 3)
print(col + row)
# tensor([[11., 21., 31.],
# [12., 22., 32.],
# [13., 23., 33.]])
# Practical: add bias to each row of a batch
batch = torch.randn(32, 128) # 32 samples, 128 features
bias = torch.randn(128) # one bias per feature
result = batch + bias # (32, 128) — bias broadcast over batch dim
In-place operations
t = torch.tensor([1.0, 2.0, 3.0])
# In-place ops are marked with a trailing underscore
t.add_(10) # t is now tensor([11., 12., 13.])
t.mul_(2) # t is now tensor([22., 24., 26.])
t.zero_() # t is now tensor([0., 0., 0.])
t.fill_(5) # t is now tensor([5., 5., 5.])
# Equivalent using out= parameter
a = torch.randn(3)
b = torch.randn(3)
result = torch.empty(3)
torch.add(a, b, out=result) # writes into pre-allocated result
RuntimeError. Prefer out-of-place ops inside model code unless you know what you're doing.
Reduction operations
t = torch.tensor([[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0]]) # shape (2, 3)
# Global reductions
print(t.sum()) # tensor(21.)
print(t.mean()) # tensor(3.5)
print(t.max()) # tensor(6.)
print(t.min()) # tensor(1.)
print(t.std()) # standard deviation
print(t.prod()) # product of all elements
# Along a specific dimension
print(t.sum(dim=0)) # tensor([5., 7., 9.]) — sum over rows
print(t.sum(dim=1)) # tensor([ 6., 15.]) — sum over cols
# keepdim=True preserves the reduced dimension (useful for broadcasting)
print(t.sum(dim=1, keepdim=True)) # shape (2, 1)
# Argmax / argmin — return index, not value
print(t.argmax()) # global: tensor(5)
print(t.argmax(dim=1)) # per row: tensor([2, 2])
print(t.argmin(dim=0)) # per col: tensor([0, 0, 0])
# topk — returns values and indices of k largest elements
values, indices = torch.topk(t.flatten(), k=3)
print(values) # tensor([6., 5., 4.])
Autograd — Automatic Differentiation
PyTorch's autograd engine records operations on tensors that have requires_grad=True, building a dynamic computational graph. Calling .backward() traverses this graph in reverse (backpropagation) to compute gradients.
Basic gradient computation
import torch
# Scalars and simple expressions
x = torch.tensor(3.0, requires_grad=True) # leaf variable
y = x ** 2 + 2 * x + 1 # y = x^2 + 2x + 1
y.backward() # compute dy/dx
print(x.grad) # tensor(8.) — dy/dx at x=3 is 2x+2 = 8
# Multi-variable: partial derivatives
x = torch.tensor(2.0, requires_grad=True)
w = torch.tensor(3.0, requires_grad=True)
b = torch.tensor(1.0, requires_grad=True)
y = w * x + b # simple linear model
y.backward()
print(x.grad) # dy/dx = w = 3.0
print(w.grad) # dy/dw = x = 2.0
print(b.grad) # dy/db = 1.0
Vector-valued outputs
x = torch.randn(3, requires_grad=True)
y = x ** 2 # element-wise, shape (3,)
# For non-scalar outputs, must provide gradient argument (the upstream grad)
y.backward(torch.ones_like(x)) # dy/dx where each dy_i/dx_i = 2x_i
print(x.grad) # tensor([2*x[0], 2*x[1], 2*x[2]])
# Alternatively, sum to scalar first
x.grad.zero_() # clear accumulated grad
loss = y.sum() # scalar
loss.backward()
print(x.grad) # same result
Disabling gradient tracking
x = torch.randn(3, requires_grad=True)
# Context manager — cheapest option for inference
with torch.no_grad():
y = x ** 2 # no graph built, saves memory
print(y.requires_grad) # False
# Decorator form for full functions
@torch.no_grad()
def predict(model, x):
return model(x)
# .detach() — creates a new tensor that shares storage but stops grad flow
y = x ** 2
y_detached = y.detach() # same data, requires_grad=False
# Useful when you need the value for a computation but don't want grads
Gradient accumulation and zeroing
x = torch.tensor(2.0, requires_grad=True)
# PyTorch ACCUMULATES gradients — you must zero them between steps
for i in range(3):
y = x ** 2
y.backward()
print(x.grad) # 4.0, 8.0, 12.0 — keeps adding!
x.grad.zero_() # manual zero
# In a training loop this is done via optimizer.zero_grad()
# retain_graph=True — keep graph alive for multiple backward passes
y = x ** 2
y.backward(retain_graph=True) # graph still alive
y.backward() # second pass works
Custom autograd functions
class ReLUCustom(torch.autograd.Function):
"""Manual implementation of ReLU to illustrate the API."""
@staticmethod
def forward(ctx, x):
# Save tensors needed in backward
ctx.save_for_backward(x)
return x.clamp(min=0)
@staticmethod
def backward(ctx, grad_output):
# grad_output is the upstream gradient (chain rule)
(x,) = ctx.saved_tensors
grad_input = grad_output.clone()
grad_input[x < 0] = 0 # derivative of max(0,x) is 0 where x<0
return grad_input
relu = ReLUCustom.apply
x = torch.randn(4, requires_grad=True)
y = relu(x).sum()
y.backward()
print(x.grad) # 1.0 where x>0, 0.0 where x<=0
nn.Module — Building Neural Networks
All neural network components in PyTorch subclass nn.Module. Modules compose recursively: a layer is a module, a network is a module, a block inside a network is a module. The key contract is implementing forward().
Built-in layers
import torch
import torch.nn as nn
# Fully connected layer: y = xW^T + b
linear = nn.Linear(in_features=128, out_features=64)
print(linear.weight.shape) # (64, 128)
print(linear.bias.shape) # (64,)
# Activations
relu = nn.ReLU()
sigmoid = nn.Sigmoid()
tanh = nn.Tanh()
gelu = nn.GELU() # used in Transformers
leaky = nn.LeakyReLU(0.01)
# Normalization
bn = nn.BatchNorm1d(64) # normalizes over batch dim, for 1-D features
bn2 = nn.BatchNorm2d(64) # for 2-D feature maps (conv layers)
ln = nn.LayerNorm(64) # normalizes over feature dim (used in Transformers)
# Regularization
dropout = nn.Dropout(p=0.5) # randomly zeros 50% of neurons during training
# Convolution
conv = nn.Conv2d(
in_channels=3,
out_channels=32,
kernel_size=3,
stride=1,
padding=1 # 'same' padding keeps spatial size
)
pool = nn.MaxPool2d(kernel_size=2, stride=2)
avgpool = nn.AdaptiveAvgPool2d((1, 1)) # output size always (1,1), any input
# Embedding (maps integer indices to dense vectors)
embed = nn.Embedding(num_embeddings=10000, embedding_dim=256)
Custom modules
class MLP(nn.Module):
"""A simple multi-layer perceptron.
Layers: input → hidden1 (ReLU + Dropout) → hidden2 (ReLU) → output
"""
def __init__(self, input_dim: int, hidden_dim: int, output_dim: int, dropout: float = 0.2):
super().__init__() # always call first
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, output_dim)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x shape: (batch, input_dim)
x = self.relu(self.fc1(x))
x = self.dropout(x)
x = self.relu(self.fc2(x))
x = self.fc3(x)
# No activation on output — let the loss function handle it
return x
model = MLP(input_dim=784, hidden_dim=256, output_dim=10)
# Inspect parameters
total_params = sum(p.numel() for p in model.parameters())
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total: {total_params:,} Trainable: {trainable:,}")
Sequential and functional APIs
# nn.Sequential — concise for simple feed-forward chains
model = nn.Sequential(
nn.Linear(784, 256),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, 10),
)
# Functional API (torch.nn.functional) — stateless operations
import torch.nn.functional as F
x = torch.randn(32, 784)
x = F.relu(x) # no module needed, just the function
x = F.dropout(x, p=0.5, training=True)
# Use functional for things that have no learnable parameters.
# Use nn.Module subclasses (or nn.Sequential layers) for things with
# parameters (Linear, Conv, BatchNorm) so they're tracked properly.
# Named modules — useful for selective freezing / inspection
for name, module in model.named_modules():
print(name, module)
# Named parameters
for name, param in model.named_parameters():
print(name, param.shape, param.requires_grad)
Loss Functions & Optimizers
Common loss functions
import torch
import torch.nn as nn
# --- Classification ---
# CrossEntropyLoss: combines LogSoftmax + NLLLoss
# Expects raw logits (NOT probabilities), integer class labels
criterion = nn.CrossEntropyLoss()
logits = torch.randn(32, 10) # (batch, num_classes)
labels = torch.randint(0, 10, (32,)) # integer class indices
loss = criterion(logits, labels)
# BCEWithLogitsLoss: binary classification (or multi-label)
# Combines Sigmoid + BCELoss — numerically more stable than doing it manually
bce = nn.BCEWithLogitsLoss()
logits_bin = torch.randn(32, 1)
targets = torch.randint(0, 2, (32, 1)).float()
loss_bin = bce(logits_bin, targets)
# --- Regression ---
# Mean Squared Error
mse = nn.MSELoss()
pred = torch.randn(32, 1)
target = torch.randn(32, 1)
loss_mse = mse(pred, target)
# Smooth L1 (Huber loss) — less sensitive to outliers than MSE
smooth_l1 = nn.SmoothL1Loss()
loss_huber = smooth_l1(pred, target)
Optimizers
import torch.optim as optim
model = nn.Linear(10, 1)
# SGD — classic, often used with momentum
optimizer = optim.SGD(
model.parameters(),
lr=0.01,
momentum=0.9,
weight_decay=1e-4 # L2 regularization
)
# Adam — adaptive learning rates, popular default
optimizer = optim.Adam(
model.parameters(),
lr=1e-3,
betas=(0.9, 0.999),
eps=1e-8
)
# AdamW — Adam with decoupled weight decay (preferred for Transformers)
optimizer = optim.AdamW(
model.parameters(),
lr=1e-3,
weight_decay=0.01
)
# Per-layer learning rates — useful for fine-tuning pretrained models
optimizer = optim.AdamW([
{"params": model.fc1.parameters(), "lr": 1e-4}, # backbone: small lr
{"params": model.fc2.parameters(), "lr": 1e-3}, # head: larger lr
], lr=1e-3) # default lr for any group without explicit lr
Learning rate schedulers
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
# StepLR — multiply lr by gamma every step_size epochs
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)
# CosineAnnealingLR — smoothly decay to eta_min over T_max epochs
scheduler = optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=100, eta_min=1e-6
)
# OneCycleLR — warmup then decay, great for training from scratch
scheduler = optim.lr_scheduler.OneCycleLR(
optimizer,
max_lr=1e-2,
steps_per_epoch=len(train_loader),
epochs=30,
)
# ReduceLROnPlateau — reduce lr when metric stops improving
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
optimizer,
mode="min", # "min" for loss, "max" for accuracy
factor=0.5, # new_lr = lr * factor
patience=5, # epochs to wait
verbose=True,
)
# Called with the monitored metric:
# scheduler.step(val_loss)
# Step the scheduler (after each epoch for most, after each batch for OneCycle)
for epoch in range(epochs):
train(...)
scheduler.step()
# Check current learning rate
current_lr = optimizer.param_groups[0]['lr']
print(f"Current LR: {current_lr}")
The Training Loop
The canonical PyTorch training loop has a fixed structure. Understanding each step and why it exists is essential — this is what every framework (Lightning, HuggingFace Trainer) abstracts but never hides from you completely.
Canonical loop
import torch
import torch.nn as nn
import torch.optim as optim
def train_one_epoch(model, loader, optimizer, criterion, device):
model.train() # sets BatchNorm and Dropout to training mode
total_loss = 0.0
correct = 0
total = 0
for batch_idx, (inputs, targets) in enumerate(loader):
# 1. Move data to device
inputs = inputs.to(device)
targets = targets.to(device)
# 2. Zero out gradients from previous step
# (PyTorch accumulates — must clear manually)
optimizer.zero_grad()
# 3. Forward pass
outputs = model(inputs)
# 4. Compute loss
loss = criterion(outputs, targets)
# 5. Backward pass — compute gradients
loss.backward()
# 6. Gradient clipping (optional but important for RNNs / Transformers)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 7. Update parameters
optimizer.step()
# Accumulate metrics
total_loss += loss.item() * inputs.size(0)
_, predicted = outputs.max(1)
correct += predicted.eq(targets).sum().item()
total += targets.size(0)
avg_loss = total_loss / total
accuracy = 100.0 * correct / total
return avg_loss, accuracy
@torch.no_grad()
def evaluate(model, loader, criterion, device):
model.eval() # disables Dropout, uses running stats for BatchNorm
total_loss = 0.0
correct = 0
total = 0
for inputs, targets in loader:
inputs = inputs.to(device)
targets = targets.to(device)
outputs = model(inputs)
loss = criterion(outputs, targets)
total_loss += loss.item() * inputs.size(0)
_, predicted = outputs.max(1)
correct += predicted.eq(targets).sum().item()
total += targets.size(0)
return total_loss / total, 100.0 * correct / total
# Full training script
def train(model, train_loader, val_loader, epochs=30):
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=0.01)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
best_val_acc = 0.0
for epoch in range(1, epochs + 1):
train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion, device)
val_loss, val_acc = evaluate(model, val_loader, criterion, device)
scheduler.step()
print(f"Epoch {epoch:3d} | "
f"Train Loss: {train_loss:.4f} Acc: {train_acc:.1f}% | "
f"Val Loss: {val_loss:.4f} Acc: {val_acc:.1f}%")
# Checkpoint the best model
if val_acc > best_val_acc:
best_val_acc = val_acc
torch.save(model.state_dict(), "best_model.pth")
Mixed precision training
Mixed precision (FP16/BF16 for forward pass, FP32 for gradients) is ~2x faster on modern GPUs and uses half the memory, with no loss in accuracy.
from torch.cuda.amp import GradScaler, autocast
scaler = GradScaler() # manages loss scaling to prevent FP16 underflow
for inputs, targets in loader:
inputs = inputs.to(device)
targets = targets.to(device)
optimizer.zero_grad()
# autocast: runs forward pass in FP16 where safe
with autocast(device_type="cuda"): # or "mps" on Apple Silicon
outputs = model(inputs)
loss = criterion(outputs, targets)
# Scale loss, backward in FP32, then unscale and step
scaler.scale(loss).backward()
scaler.unscale_(optimizer) # unscale before clipping
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
scaler.step(optimizer)
scaler.update()
# On Apple Silicon, use torch.autocast("mps") directly — no GradScaler needed
with torch.autocast("mps", dtype=torch.float16):
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
- Forgetting
optimizer.zero_grad()— gradients accumulate across batches, producing wildly incorrect updates. - Forgetting
model.eval()during validation — Dropout stays active, BatchNorm uses batch stats. Your val metrics will be wrong. - Calling
loss.item()inside the loop —.item()is fine; but avoid.numpy()on GPU tensors inside the loop (forces a device sync every step). - Not moving targets to device — easy to miss when you're focused on inputs.
Data Loading
Dataset and DataLoader
from torch.utils.data import Dataset, DataLoader, random_split
import torch
class TabularDataset(Dataset):
"""Minimal custom Dataset for tabular (feature, label) data."""
def __init__(self, features: torch.Tensor, labels: torch.Tensor):
assert len(features) == len(labels)
self.features = features
self.labels = labels
def __len__(self) -> int:
return len(self.features)
def __getitem__(self, idx: int):
return self.features[idx], self.labels[idx]
# Create dataset and split into train/val
features = torch.randn(1000, 20)
labels = torch.randint(0, 5, (1000,))
dataset = TabularDataset(features, labels)
train_set, val_set = random_split(dataset, [800, 200])
# DataLoader: handles batching, shuffling, parallel loading
train_loader = DataLoader(
train_set,
batch_size=64,
shuffle=True, # shuffle on every epoch
num_workers=4, # parallel data loading (0 = main process only)
pin_memory=True, # faster CPU→GPU transfer (use with CUDA)
drop_last=True, # drop final incomplete batch
)
val_loader = DataLoader(
val_set,
batch_size=128,
shuffle=False,
num_workers=4,
pin_memory=True,
)
# Iterate
for batch_features, batch_labels in train_loader:
print(batch_features.shape, batch_labels.shape)
break # (64, 20), (64,)
Transforms with torchvision
import torchvision
import torchvision.transforms as transforms
from torchvision import datasets
# Standard image augmentation pipeline for training
train_transforms = transforms.Compose([
transforms.RandomCrop(32, padding=4), # random crop with padding
transforms.RandomHorizontalFlip(p=0.5), # flip left/right
transforms.ColorJitter(brightness=0.2, contrast=0.2),
transforms.ToTensor(), # PIL Image → [0,1] float tensor
transforms.Normalize( # ImageNet stats
mean=(0.4914, 0.4822, 0.4465),
std =(0.2023, 0.1994, 0.2010)
),
])
# Validation: only resize + normalize, no augmentation
val_transforms = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(
mean=(0.4914, 0.4822, 0.4465),
std =(0.2023, 0.1994, 0.2010)
),
])
# Download CIFAR-10 and MNIST
cifar_train = datasets.CIFAR10(root="./data", train=True, download=True, transform=train_transforms)
cifar_val = datasets.CIFAR10(root="./data", train=False, download=True, transform=val_transforms)
mnist_train = datasets.MNIST(root="./data", train=True, download=True, transform=transforms.ToTensor())
train_loader = DataLoader(cifar_train, batch_size=128, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(cifar_val, batch_size=256, shuffle=False, num_workers=4, pin_memory=True)
Custom collate function
from torch.nn.utils.rnn import pad_sequence
def collate_variable_length(batch):
"""Pad sequences in a batch to the same length.
Used when samples have different sequence lengths (e.g. NLP).
"""
sequences, labels = zip(*batch)
# pad_sequence stacks tensors and pads shorter ones with zeros
padded = pad_sequence(sequences, batch_first=True, padding_value=0)
labels = torch.stack(labels)
return padded, labels
loader = DataLoader(
dataset,
batch_size=32,
collate_fn=collate_variable_length,
)
num_workers=4. On macOS with MPS, use num_workers=0 inside Jupyter (forking + MPS causes hangs). For production training, num_workers = os.cpu_count() // 2 is a reasonable heuristic. Enable pin_memory=True whenever using CUDA.
CNN Architectures
Building blocks
import torch.nn as nn
# Convolution output size formula:
# H_out = floor((H_in + 2*padding - kernel_size) / stride) + 1
conv = nn.Conv2d(
in_channels=3,
out_channels=64,
kernel_size=3,
stride=1,
padding=1, # padding=1 with kernel=3 and stride=1 keeps H,W unchanged
bias=False, # set False when followed by BatchNorm (bias is redundant)
)
# A typical ConvBlock
class ConvBlock(nn.Module):
"""Conv → BN → ReLU (common building block in modern CNNs)."""
def __init__(self, in_ch: int, out_ch: int, stride: int = 1):
super().__init__()
self.net = nn.Sequential(
nn.Conv2d(in_ch, out_ch, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(out_ch),
nn.ReLU(inplace=True),
)
def forward(self, x):
return self.net(x)
Simple CNN (LeNet-style for CIFAR-10)
class SimpleCNN(nn.Module):
"""Lightweight CNN for 32x32 image classification (e.g. CIFAR-10)."""
def __init__(self, num_classes: int = 10):
super().__init__()
# Feature extractor
self.features = nn.Sequential(
# Block 1: 3x32x32 → 32x16x16
nn.Conv2d(3, 32, kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(32),
nn.ReLU(inplace=True),
nn.MaxPool2d(2, 2),
# Block 2: 32x16x16 → 64x8x8
nn.Conv2d(32, 64, kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(2, 2),
# Block 3: 64x8x8 → 128x4x4
nn.Conv2d(64, 128, kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.MaxPool2d(2, 2),
)
# Global average pooling instead of flatten — fewer parameters, less overfitting
self.pool = nn.AdaptiveAvgPool2d((1, 1))
# Classifier
self.classifier = nn.Sequential(
nn.Dropout(0.3),
nn.Linear(128, num_classes),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.features(x) # (B, 128, 4, 4)
x = self.pool(x) # (B, 128, 1, 1)
x = x.flatten(1) # (B, 128)
return self.classifier(x) # (B, num_classes)
model = SimpleCNN(num_classes=10)
x = torch.randn(8, 3, 32, 32)
print(model(x).shape) # torch.Size([8, 10])
Transfer learning
import torchvision.models as models
# Load a pretrained ResNet-50 (downloads weights automatically)
backbone = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)
# Strategy 1: Feature extraction — freeze all except the classifier
for param in backbone.parameters():
param.requires_grad = False
# Replace the final FC layer to match our number of classes
num_features = backbone.fc.in_features # 2048 for ResNet-50
backbone.fc = nn.Linear(num_features, 10) # only this layer trains
# Strategy 2: Fine-tuning — unfreeze the last block + classifier
for param in backbone.layer4.parameters():
param.requires_grad = True
for param in backbone.fc.parameters():
param.requires_grad = True
# Use different LRs for frozen backbone vs new head
optimizer = optim.AdamW([
{"params": backbone.layer4.parameters(), "lr": 1e-5},
{"params": backbone.fc.parameters(), "lr": 1e-3},
])
# Common pretrained models
resnet18 = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
efficientb0 = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
vit_b16 = models.vit_b_16(weights=models.ViT_B_16_Weights.DEFAULT)
# Forward pass — 224x224 input for ImageNet-pretrained models
x = torch.randn(4, 3, 224, 224)
out = backbone(x)
print(out.shape) # (4, 10)
RNNs & Sequence Models
RNNs process sequences step-by-step, maintaining a hidden state. While Transformers have largely superseded RNNs for NLP, LSTM/GRU remain relevant for time-series, on-device inference, and streaming scenarios.
LSTM and GRU
import torch
import torch.nn as nn
# nn.LSTM: input_size, hidden_size, num_layers, batch_first
lstm = nn.LSTM(
input_size=128,
hidden_size=256,
num_layers=2,
batch_first=True, # input shape: (batch, seq_len, input_size)
dropout=0.2, # dropout between layers (not after last)
bidirectional=False,
)
# GRU — simpler than LSTM, fewer parameters, often similar performance
gru = nn.GRU(
input_size=128,
hidden_size=256,
num_layers=2,
batch_first=True,
bidirectional=True, # output size = 2 * hidden_size
)
# Forward pass
x = torch.randn(32, 50, 128) # (batch, seq_len, input_size)
# LSTM returns (output, (h_n, c_n))
output, (h_n, c_n) = lstm(x)
print(output.shape) # (32, 50, 256) — all hidden states
print(h_n.shape) # (2, 32, 256) — final hidden state per layer
# GRU returns (output, h_n)
output, h_n = gru(x)
print(output.shape) # (32, 50, 512) — bidirectional: 2 * 256
Sequence classifier
class TextClassifier(nn.Module):
"""LSTM-based text classifier.
Architecture: Embedding → LSTM → mean pooling → Linear
"""
def __init__(
self,
vocab_size: int,
embed_dim: int,
hidden_dim: int,
num_layers: int,
num_classes: int,
pad_idx: int = 0,
):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_idx)
self.lstm = nn.LSTM(
embed_dim, hidden_dim,
num_layers=num_layers,
batch_first=True,
bidirectional=True,
dropout=0.3 if num_layers > 1 else 0.0,
)
self.classifier = nn.Linear(hidden_dim * 2, num_classes) # *2 for bidir
self.dropout = nn.Dropout(0.3)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x: (batch, seq_len) — integer token ids
embedded = self.dropout(self.embedding(x)) # (B, T, E)
output, _ = self.lstm(embedded) # (B, T, 2*H)
pooled = output.mean(dim=1) # mean over time steps
return self.classifier(self.dropout(pooled)) # (B, num_classes)
model = TextClassifier(
vocab_size=30000, embed_dim=128, hidden_dim=256,
num_layers=2, num_classes=5,
)
x = torch.randint(0, 30000, (16, 100)) # (batch=16, seq_len=100)
print(model(x).shape) # (16, 5)
Packed sequences (variable lengths)
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
# Packing avoids computing over padding tokens — important for efficiency
# Sequences must be sorted by length (longest first) before packing
def forward_with_packing(self, x, lengths):
# x: (batch, max_len), lengths: list of actual lengths
embedded = self.embedding(x) # (B, T, E)
# Pack: tells LSTM to skip padding positions
packed = pack_padded_sequence(
embedded, lengths.cpu(),
batch_first=True,
enforce_sorted=True # set False to skip sorting
)
packed_output, (h_n, c_n) = self.lstm(packed)
# Unpack back to (B, T, H) with padding
output, _ = pad_packed_sequence(packed_output, batch_first=True)
return output
transformers library) instead of LSTM/GRU. They parallelize over the full sequence during training, scale better, and achieve state-of-the-art results. RNNs remain useful for: streaming inference (processes one token at a time, constant memory), on-device / edge deployment (smaller models), and time-series with strong temporal locality.
Saving & Loading Models
State dict (recommended approach)
import torch
import torch.nn as nn
model = nn.Linear(10, 1)
# Save only the parameters (weights + biases)
torch.save(model.state_dict(), "model.pth")
# Load: must recreate the same architecture first
loaded_model = nn.Linear(10, 1)
loaded_model.load_state_dict(torch.load("model.pth", map_location="cpu"))
loaded_model.eval()
# Inspect state dict
sd = model.state_dict()
for key, tensor in sd.items():
print(key, tensor.shape)
# weight torch.Size([1, 10])
# bias torch.Size([1])
Saving the full model
# Save entire model (architecture + weights)
# Convenient but brittle: fails if you refactor the class
torch.save(model, "full_model.pth")
loaded = torch.load("full_model.pth", map_location="cpu")
# Recommended for research: save both
torch.save({
"architecture": "SimpleCNN",
"model_state_dict": model.state_dict(),
}, "model_with_meta.pth")
Full training checkpoint
def save_checkpoint(model, optimizer, scheduler, epoch, val_loss, path):
"""Save a resumable training checkpoint."""
torch.save({
"epoch": epoch,
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"scheduler_state_dict": scheduler.state_dict(),
"val_loss": val_loss,
}, path)
def load_checkpoint(model, optimizer, scheduler, path, device):
"""Restore model, optimizer, and scheduler state to resume training."""
checkpoint = torch.load(path, map_location=device)
model.load_state_dict(checkpoint["model_state_dict"])
optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
scheduler.load_state_dict(checkpoint["scheduler_state_dict"])
start_epoch = checkpoint["epoch"] + 1
val_loss = checkpoint["val_loss"]
print(f"Resumed from epoch {checkpoint['epoch']}, val_loss={val_loss:.4f}")
return start_epoch
# In the training loop:
save_checkpoint(model, optimizer, scheduler, epoch, val_loss, f"ckpt_epoch_{epoch}.pth")
ONNX export
import torch.onnx
model.eval()
dummy_input = torch.randn(1, 3, 224, 224) # must match model's expected input
torch.onnx.export(
model,
dummy_input,
"model.onnx",
export_params=True, # include learned weights
opset_version=17, # ONNX opset version
input_names=["input"],
output_names=["output"],
dynamic_axes={ # allow variable batch size and sequence length
"input": {0: "batch_size"},
"output": {0: "batch_size"},
},
)
# Verify the exported model
import onnx
onnx_model = onnx.load("model.onnx")
onnx.checker.check_model(onnx_model)
print("ONNX model is valid")
# Run with ONNX Runtime
import onnxruntime as ort
import numpy as np
sess = ort.InferenceSession("model.onnx", providers=["CPUExecutionProvider"])
inputs = {"input": dummy_input.numpy()}
outputs = sess.run(None, inputs)
print(outputs[0].shape) # (1, num_classes)
Debugging & Profiling
Common pitfalls
| Symptom | Likely Cause | Fix |
|---|---|---|
| Loss stays NaN from step 1 | NaN in input data, or log(0) in loss | torch.isnan(x).any() to locate; check data pipeline |
| Loss explodes after a few steps | Learning rate too high; gradient explosion | Lower LR; add gradient clipping (clip_grad_norm_) |
| Gradients are all zero | Activation saturation; dead ReLUs; disconnected graph | Check for saturated sigmoids; use LeakyReLU; verify graph |
| RuntimeError: expected device cpu got mps | Tensor device mismatch | Move all tensors to the same device before operations |
| CUDA out of memory | Batch too large; retaining computation graph | Reduce batch size; call loss.item() not loss; use torch.no_grad() |
| Validation loss lower than train loss | Dropout active during eval; data leakage | Call model.eval(); audit your data splits |
Shape debugging
# The most useful debugging tool: print shapes at every step
class ShapeDebugModel(nn.Module):
def forward(self, x):
print(f"input: {x.shape}")
x = self.conv1(x)
print(f"after conv1: {x.shape}")
x = self.pool(x)
print(f"after pool: {x.shape}")
x = x.flatten(1)
print(f"after flatten: {x.shape}")
return self.fc(x)
# torchinfo (pip install torchinfo) gives a clean model summary
from torchinfo import summary
summary(model, input_size=(8, 3, 32, 32))
# Columns: Layer, Output Shape, Param #, Trainable
# Check for NaN/Inf in a tensor
x = torch.randn(3, 4)
print(torch.isnan(x).any()) # False
print(torch.isinf(x).any()) # False
# Gradient inspection
for name, param in model.named_parameters():
if param.grad is not None:
print(f"{name}: grad norm = {param.grad.norm():.4f}")
Profiling with torch.profiler
from torch.profiler import profile, record_function, ProfilerActivity
with profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
record_shapes=True,
with_stack=True,
) as prof:
with record_function("forward"):
output = model(inputs)
with record_function("backward"):
output.sum().backward()
# Print a sorted table of ops by total CPU time
print(prof.key_averages().table(sort_by="cpu_time_total", row_limit=15))
# Export to Chrome trace viewer
prof.export_chrome_trace("trace.json")
# Open in chrome://tracing or ui.perfetto.dev
Quick gradient flow check
def plot_grad_flow(named_parameters):
"""Visualize mean absolute gradient per layer.
Useful for diagnosing vanishing/exploding gradients.
Call after loss.backward() and before optimizer.step().
"""
import matplotlib.pyplot as plt
layers, avg_grads = [], []
for n, p in named_parameters:
if p.requires_grad and p.grad is not None and "bias" not in n:
layers.append(n)
avg_grads.append(p.grad.abs().mean().item())
plt.bar(range(len(avg_grads)), avg_grads)
plt.xticks(range(len(layers)), layers, rotation=90, fontsize=8)
plt.xlabel("Layers")
plt.ylabel("Mean |gradient|")
plt.title("Gradient flow")
plt.tight_layout()
plt.show()
# Usage inside training loop:
# plot_grad_flow(model.named_parameters())
Production Deployment
TorchScript
TorchScript compiles your model into a graph representation that can run without Python — enabling deployment to C++, mobile, and edge devices.
import torch
model.eval()
# Method 1: torch.jit.trace — records a specific execution trace
# Pro: works on any model Con: only captures the traced code path
dummy = torch.randn(1, 3, 224, 224)
traced = torch.jit.trace(model, dummy)
traced.save("model_traced.pt")
# Method 2: torch.jit.script — statically analyzes Python code
# Pro: handles control flow Con: requires type annotations, limited Python
@torch.jit.script
def scaled_dot_product(q, k, v, scale: float):
scores = (q @ k.transpose(-2, -1)) / scale
weights = torch.softmax(scores, dim=-1)
return weights @ v
# Script an entire module
scripted = torch.jit.script(model)
scripted.save("model_scripted.pt")
# Load and run in Python (also loadable from C++)
loaded = torch.jit.load("model_scripted.pt")
output = loaded(dummy)
# Inspect the generated IR
print(scripted.graph)
Quantization
import torch.quantization as quant
# Dynamic quantization — quantizes weights at save time, activations at runtime
# Best for: LSTM / Transformer inference, CPU deployment
quantized_model = quant.quantize_dynamic(
model,
qconfig_spec={nn.Linear, nn.LSTM}, # which layer types to quantize
dtype=torch.qint8,
)
print(f"Original: {get_size_mb(model):.1f} MB")
print(f"Quantized: {get_size_mb(quantized_model):.1f} MB") # ~4x smaller
# Post-training static quantization — more accurate, requires calibration data
model.eval()
model.qconfig = quant.get_default_qconfig("fbgemm") # x86 CPU
quant.prepare(model, inplace=True)
# Calibration: run a few batches through the model
with torch.no_grad():
for inputs, _ in calibration_loader:
model(inputs)
quant.convert(model, inplace=True) # replace modules with quantized versions
# Helper to check model size in MB
def get_size_mb(model):
import io
buf = io.BytesIO()
torch.save(model.state_dict(), buf)
return buf.tell() / 1e6
TorchServe overview
# Install
pip install torchserve torch-model-archiver
# 1. Create a model archive (.mar file)
torch-model-archiver \
--model-name my_model \
--version 1.0 \
--serialized-file model_traced.pt \
--handler image_classifier \ # built-in handler
--export-path model_store/
# 2. Start TorchServe
torchserve \
--start \
--model-store model_store/ \
--models my_model=my_model.mar \
--ts-config config.properties
# 3. Query the REST API
curl -X POST \
http://localhost:8080/predictions/my_model \
-H "Content-Type: application/octet-stream" \
--data-binary @image.jpg
{
"inference_address": "http://0.0.0.0:8080",
"management_address": "http://0.0.0.0:8081",
"number_of_netty_threads": 4,
"job_queue_size": 100,
"default_workers_per_model": 2,
"default_response_timeout": 120
}
Inference patterns
class Predictor:
"""Production inference wrapper.
Handles device placement, input preprocessing, and output postprocessing.
"""
def __init__(self, model_path: str, device: str = "cpu"):
self.device = torch.device(device)
# Load scripted model — no class definition needed
self.model = torch.jit.load(model_path, map_location=self.device)
self.model.eval()
# Class names for CIFAR-10
self.classes = ["airplane", "automobile", "bird", "cat", "deer",
"dog", "frog", "horse", "ship", "truck"]
self.transform = transforms.Compose([
transforms.Resize((32, 32)),
transforms.ToTensor(),
transforms.Normalize(
mean=(0.4914, 0.4822, 0.4465),
std =(0.2023, 0.1994, 0.2010)
),
])
@torch.no_grad()
def predict(self, image) -> dict:
"""Return top-1 prediction with confidence."""
tensor = self.transform(image).unsqueeze(0).to(self.device)
logits = self.model(tensor) # (1, 10)
probs = torch.softmax(logits, dim=-1)[0] # (10,)
top_prob, top_idx = probs.max(dim=0)
return {
"class": self.classes[top_idx.item()],
"confidence": round(top_prob.item(), 4),
}
@torch.no_grad()
def predict_batch(self, images) -> list[dict]:
"""Batch inference — more efficient than calling predict() in a loop."""
tensors = torch.stack([self.transform(img) for img in images]).to(self.device)
logits = self.model(tensors)
probs = torch.softmax(logits, dim=-1)
top_probs, top_idxs = probs.max(dim=-1)
return [
{"class": self.classes[idx.item()], "confidence": round(prob.item(), 4)}
for idx, prob in zip(top_idxs, top_probs)
]
- Always call
model.eval()before inference — activates eval mode for BatchNorm and Dropout. - Wrap inference in
torch.no_grad()— prevents graph construction, saves memory and time. - Use
torch.jit.scriptortorch.jit.traceto remove Python overhead and enable C++ deployment. - Quantize to INT8 for CPU-bound deployments — typically 3-4x latency improvement with minimal accuracy loss.
- Batch requests together — GPU utilization drops sharply at batch size 1. Even batch size 8-16 is much more efficient.
- Pin model weights in memory on startup; avoid reloading per request.