Skip to content

深度学习

目录


1. 前言

1.1 深度学习简介

深度学习是机器学习的一个分支,使用多层神经网络模拟人脑的学习过程。它能够自动从原始数据中学习特征表示,在计算机视觉、自然语言处理、语音识别等领域取得了突破性进展。

1.2 核心价值

  • 深层特征提取:自动学习数据的层次化特征表示
  • 复杂模式识别:处理高维、非线性关系的数据
  • 端到端学习:从原始输入直接映射到输出,无需手动设计特征

1.3 应用场景

领域应用示例
计算机视觉图像分类、目标检测、图像分割、图像生成
自然语言处理文本分类、情感分析、机器翻译、文本生成
语音识别语音转文字、语音合成、声纹识别
推荐系统个性化推荐、内容推荐
自动驾驶环境感知、路径规划
医疗影像疾病诊断、医学图像分析
生成式AI图像生成、文本生成、视频生成

1.4 学习前提

  • Python基础:掌握Python语法、数据结构、函数
  • 机器学习基础:了解监督学习、无监督学习、评估指标
  • 数学基础:线性代数、概率论、微积分
  • 编程工具:熟悉Jupyter Notebook、PyCharm等IDE

1.5 核心工具安装

bash
# 使用pip安装(国内推荐使用阿里云镜像)
pip install torch torchvision torchaudio -i https://mirrors.aliyun.com/pypi/simple/
pip install tensorflow tensorflow-gpu -i https://mirrors.aliyun.com/pypi/simple/
pip install numpy pandas matplotlib opencv-python -i https://mirrors.aliyun.com/pypi/simple/
pip install scikit-learn transformers diffusers -i https://mirrors.aliyun.com/pypi/simple/

# 使用conda安装
conda install pytorch torchvision torchaudio cpuonly -c pytorch
conda install tensorflow
conda install numpy pandas matplotlib opencv

1.6 环境验证

python
# PyTorch验证
import torch
print(f"PyTorch版本: {torch.__version__}")
print(f"CUDA可用: {torch.cuda.is_available()}")

# TensorFlow验证
import tensorflow as tf
print(f"TensorFlow版本: {tf.__version__}")
print(f"GPU可用: {tf.config.list_physical_devices('GPU')}")

# 基础库验证
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
print("所有库导入成功!")

2. 入门基础

2.1 核心概念

2.1.1 神经网络与神经元

python
import numpy as np

# 单个神经元模拟
class Neuron:
    def __init__(self, input_size):
        self.weights = np.random.randn(input_size)  # 权重
        self.bias = np.random.randn()              # 偏置

    def forward(self, x):
        z = np.dot(self.weights, x) + self.bias    # 加权求和
        return self.sigmoid(z)                     # 激活输出

    def sigmoid(self, z):
        return 1 / (1 + np.exp(-z))

# 测试神经元
neuron = Neuron(3)
output = neuron.forward(np.array([1.0, 2.0, 3.0]))
print(f"神经元输出: {output:.4f}")

2.1.2 正向传播与反向传播

正向传播:数据从输入层经过隐藏层流向输出层的过程

反向传播:根据损失函数计算梯度,从输出层反向更新权重的过程

2.1.3 关键超参数

  • epochs:训练轮数,整个数据集遍历的次数
  • batch size:每批次训练的数据量
  • 学习率(lr):权重更新的步长,影响收敛速度和稳定性

2.1.4 过拟合与欠拟合

  • 过拟合:模型在训练集上表现很好,但在测试集上表现差
  • 欠拟合:模型在训练集和测试集上表现都很差

2.2 基础流程

python
# 完整的深度学习基础流程示例
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification
from sklearn.preprocessing import StandardScaler

# 1. 数据收集
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)

# 2. 数据预处理
scaler = StandardScaler()
X = scaler.fit_transform(X)

# 3. 数据集划分
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 4. 模型搭建(简单神经网络)
class SimpleNN:
    def __init__(self, input_size, hidden_size, output_size):
        self.W1 = np.random.randn(input_size, hidden_size) * 0.01
        self.b1 = np.zeros(hidden_size)
        self.W2 = np.random.randn(hidden_size, output_size) * 0.01
        self.b2 = np.zeros(output_size)

    def forward(self, X):
        self.z1 = X @ self.W1 + self.b1
        self.a1 = self.relu(self.z1)
        self.z2 = self.a1 @ self.W2 + self.b2
        self.a2 = self.sigmoid(self.z2)
        return self.a2

    def relu(self, x):
        return np.maximum(0, x)

    def sigmoid(self, x):
        return 1 / (1 + np.exp(-np.clip(x, -500, 500)))

# 5. 模型训练
model = SimpleNN(20, 32, 1)

def train(model, X, y, epochs=100, lr=0.01):
    for epoch in range(epochs):
        # 正向传播
        y_pred = model.forward(X)

        # 计算损失(交叉熵)
        loss = -np.mean(y * np.log(y_pred + 1e-10) + (1 - y) * np.log(1 - y_pred + 1e-10))

        # 反向传播(简化版)
        m = len(y)
        dz2 = y_pred - y.reshape(-1, 1)
        dW2 = (model.a1.T @ dz2) / m
        db2 = np.mean(dz2, axis=0)

        da1 = dz2 @ model.W2.T
        dz1 = da1 * (model.z1 > 0)
        dW1 = (X.T @ dz1) / m
        db1 = np.mean(dz1, axis=0)

        # 更新权重
        model.W1 -= lr * dW1
        model.b1 -= lr * db1
        model.W2 -= lr * dW2
        model.b2 -= lr * db2

        if epoch % 20 == 0:
            print(f"Epoch {epoch}, Loss: {loss:.4f}")

train(model, X_train, y_train)

# 6. 模型评估
y_pred = model.forward(X_test)
accuracy = np.mean((y_pred > 0.5).flatten() == y_test)
print(f"测试准确率: {accuracy:.4f}")

2.3 基础工具使用

2.3.1 NumPy数据处理

python
import numpy as np

# 创建数组
arr = np.array([[1, 2, 3], [4, 5, 6]])
print(f"数组形状: {arr.shape}")
print(f"数组均值: {np.mean(arr)}")
print(f"数组求和: {np.sum(arr, axis=0)}")

# 矩阵运算
A = np.random.randn(3, 4)
B = np.random.randn(4, 5)
C = A @ B  # 矩阵乘法
print(f"矩阵乘法结果形状: {C.shape}")

2.3.2 Pandas数据读取与清洗

python
import pandas as pd

# 读取CSV文件
df = pd.read_csv('data.csv')

# 查看数据
print(df.head())
print(df.describe())

# 数据清洗
df = df.dropna()  # 删除缺失值
df = df.drop_duplicates()  # 删除重复值
df['age'] = df['age'].astype(int)  # 类型转换

2.3.3 Matplotlib结果可视化

python
import matplotlib.pyplot as plt
import numpy as np

# 绘制损失曲线
epochs = np.arange(100)
loss = np.exp(-epochs / 20) + 0.05 * np.random.randn(100)

plt.figure(figsize=(10, 6))
plt.plot(epochs, loss, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss Curve')
plt.legend()
plt.grid(True)
plt.show()

2.3.4 OpenCV图像预处理

python
import cv2
import matplotlib.pyplot as plt

# 读取图像
img = cv2.imread('image.jpg')
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # 转换颜色空间

# 图像预处理
resized = cv2.resize(img, (224, 224))  # 调整大小
normalized = resized / 255.0  # 归一化

# 数据增强
flipped = cv2.flip(resized, 1)  # 水平翻转
rotated = cv2.rotate(resized, cv2.ROTATE_90_CLOCKWISE)  # 旋转

# 显示图像
plt.figure(figsize=(12, 4))
plt.subplot(131); plt.imshow(resized); plt.title('Original')
plt.subplot(132); plt.imshow(flipped); plt.title('Flipped')
plt.subplot(133); plt.imshow(rotated); plt.title('Rotated')
plt.show()

2.4 新手常见误区

误区原因解决方案
数据预处理不充分未进行归一化/标准化对输入数据进行标准化处理
学习率设置不当过大导致发散,过小导致收敛慢使用学习率调度器
模型过拟合模型复杂度太高使用正则化、Dropout
梯度消失/爆炸深层网络梯度传播问题使用ResNet、梯度裁剪
数据泄露测试数据参与训练严格划分数据集

3. 核心功能与基础模型

3.1 基础神经网络

3.1.1 感知机

python
import numpy as np

class Perceptron:
    """二分类感知机"""
    def __init__(self, input_size):
        self.weights = np.zeros(input_size)
        self.bias = 0

    def predict(self, x):
        z = np.dot(self.weights, x) + self.bias
        return 1 if z >= 0 else 0

    def train(self, X, y, lr=0.1, epochs=100):
        for _ in range(epochs):
            for i in range(len(X)):
                pred = self.predict(X[i])
                error = y[i] - pred
                self.weights += lr * error * X[i]
                self.bias += lr * error

# 测试感知机(AND门)
X = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])
y = np.array([0, 0, 0, 1])

perceptron = Perceptron(2)
perceptron.train(X, y)

print("AND门测试结果:")
for x in X:
    print(f"{x} -> {perceptron.predict(x)}")

3.1.2 多层感知机(MLP)

python
import torch
import torch.nn as nn

class MLP(nn.Module):
    """多层感知机"""
    def __init__(self, input_size, hidden_sizes, output_size):
        super(MLP, self).__init__()
        layers = []
        prev_size = input_size
        for hidden_size in hidden_sizes:
            layers.append(nn.Linear(prev_size, hidden_size))
            layers.append(nn.ReLU())
            prev_size = hidden_size
        layers.append(nn.Linear(prev_size, output_size))
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

# 创建MLP模型
model = MLP(input_size=784, hidden_sizes=[256, 128, 64], output_size=10)
print(model)

3.2 激活函数

python
import numpy as np
import matplotlib.pyplot as plt

x = np.linspace(-5, 5, 100)

# Sigmoid
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

# ReLU
def relu(x):
    return np.maximum(0, x)

# Leaky ReLU
def leaky_relu(x, alpha=0.01):
    return np.maximum(alpha * x, x)

# Tanh
def tanh(x):
    return np.tanh(x)

# Softmax
def softmax(x):
    exp_x = np.exp(x - np.max(x))
    return exp_x / np.sum(exp_x)

# 可视化
plt.figure(figsize=(12, 8))
plt.subplot(221); plt.plot(x, sigmoid(x)); plt.title('Sigmoid')
plt.subplot(222); plt.plot(x, relu(x)); plt.title('ReLU')
plt.subplot(223); plt.plot(x, leaky_relu(x)); plt.title('Leaky ReLU')
plt.subplot(224); plt.plot(x, tanh(x)); plt.title('Tanh')
plt.show()

激活函数对比

激活函数优点缺点适用场景
Sigmoid输出在(0,1),适合概率输出梯度消失问题输出层(二分类)
ReLU计算简单,缓解梯度消失神经元死亡问题隐藏层
Leaky ReLU解决神经元死亡增加超参数隐藏层
Tanh输出在(-1,1),零均值梯度消失隐藏层
Softmax输出概率分布计算开销大多分类输出层

3.3 损失函数

python
import numpy as np

# 均方误差(MSE) - 回归任务
def mse_loss(y_true, y_pred):
    return np.mean((y_true - y_pred) ** 2)

# 交叉熵损失 - 分类任务
def cross_entropy_loss(y_true, y_pred):
    eps = 1e-15
    y_pred = np.clip(y_pred, eps, 1 - eps)
    return -np.mean(y_true * np.log(y_pred))

# 二元交叉熵
def bce_loss(y_true, y_pred):
    eps = 1e-15
    y_pred = np.clip(y_pred, eps, 1 - eps)
    return -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))

# 示例
y_true = np.array([1, 0, 1, 1, 0])
y_pred = np.array([0.9, 0.1, 0.8, 0.95, 0.2])

print(f"MSE: {mse_loss(y_true, y_pred):.4f}")
print(f"BCE: {bce_loss(y_true, y_pred):.4f}")

3.4 优化器

python
import torch
import torch.nn as nn

# 定义模型
model = nn.Linear(10, 1)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 训练循环
for epoch in range(100):
    optimizer.zero_grad()  # 清零梯度

    # 前向传播
    x = torch.randn(32, 10)
    y = torch.randn(32, 1)
    pred = model(x)

    # 计算损失
    loss = nn.MSELoss()(pred, y)

    # 反向传播
    loss.backward()

    # 更新权重
    optimizer.step()

优化器对比

优化器特点学习率设置适用场景
SGD基础优化器较小(0.01-0.1)通用
SGD+Momentum加速收敛0.01-0.1通用
RMSprop自适应学习率0.001非凸优化
Adam自适应学习率,结合动量0.001大多数场景
AdamWAdam+权重衰减0.001正则化场景

3.5 数据预处理

3.5.1 图像预处理

python
from torchvision import transforms

# 训练集变换(含数据增强)
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 测试集变换(不含数据增强)
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

3.5.2 文本预处理

python
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# 文本数据
texts = ["I love deep learning", "Machine learning is fun", "Neural networks are powerful"]

# 构建词汇表
tokenizer = Tokenizer(num_words=1000)
tokenizer.fit_on_texts(texts)

# 文本转序列
sequences = tokenizer.texts_to_sequences(texts)
print("序列:", sequences)

# 序列填充(统一长度)
padded = pad_sequences(sequences, maxlen=10, padding='post')
print("填充后:\n", padded)

# 获取词汇表
word_index = tokenizer.word_index
print("词汇表:", word_index)

3.6 目标检测

目标检测任务不仅要识别图像中有哪些目标(分类),还要定位目标的位置(边界框回归)。

3.6.1 核心概念

  • Grid Cell:YOLO 将图像划分为 S×S 的网格,每个格子负责预测中心落在该格子内的目标
  • Anchor Box:预定义的边界框形状,用于预测不同宽高比的目标,每个 Grid Cell 预测多个 Anchor
  • 置信度(Confidence):预测框包含目标的概率 × 预测框与真实框的 IoU
  • IoU(Intersection over Union):预测框与真实框的交集面积 / 并集面积,衡量定位精度

3.6.2 YOLO vs Faster R-CNN

维度YOLOFaster R-CNN
检测方式单阶段(one-stage)两阶段(two-stage)
速度快(可实时)慢(非实时)
精度略低较高
核心组件Grid + AnchorRPN + ROI Pooling
适用场景实时检测、边缘部署高精度检测

Faster R-CNN 两阶段流程

  1. RPN(Region Proposal Network):生成候选区域(Region Proposals)
  2. ROI Pooling:将候选区域特征统一到固定大小,再进行分类和边框回归

3.6.3 YOLOv5 实战代码

python
# 安装:pip install ultralytics
import torch

# 加载 YOLOv5 预训练模型(自动下载权重)
# 'yolov5s' 为小模型,速度快;'yolov5l' 精度更高
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
model.eval()

# 单张图像检测(支持路径、URL、PIL Image、numpy array)
img_path = 'test.jpg'
results = model(img_path)

# 打印检测结果摘要
results.print()

# 获取结构化检测结果 [x1, y1, x2, y2, confidence, class_id]
predictions = results.xyxy[0]
for pred in predictions:
    x1, y1, x2, y2, conf, cls = pred
    class_name = model.names[int(cls)]
    print(f"类别: {class_name}, 置信度: {conf:.2f}, 位置: ({x1:.0f},{y1:.0f})-({x2:.0f},{y2:.0f})")

# 保存标注结果图像
results.save(save_dir='./results')
print("检测结果已保存到 ./results 目录")

注意:YOLOv5 需要联网下载模型权重(约 14MB),首次运行需要网络连接。国内可使用镜像或手动下载权重文件。

3.6.4 使用 torchvision 的 Faster R-CNN

python
import torch
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights
from torchvision import transforms
from PIL import Image

# 加载预训练 Faster R-CNN(COCO 数据集,80类)
weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
model = fasterrcnn_resnet50_fpn(weights=weights)
model.eval()

# COCO 类别名称
COCO_CLASSES = weights.meta['categories']

# 图像预处理
transform = transforms.Compose([transforms.ToTensor()])

# 使用随机张量模拟输入(实际使用时替换为真实图像)
dummy_image = torch.rand(3, 480, 640)  # C x H x W

# 推理(输入为列表)
with torch.no_grad():
    predictions = model([dummy_image])

# 解析结果
boxes = predictions[0]['boxes']    # 边界框 [N, 4]
scores = predictions[0]['scores']  # 置信度 [N]
labels = predictions[0]['labels']  # 类别 ID [N]

# 过滤低置信度结果(阈值 0.5)
threshold = 0.5
keep = scores > threshold
print(f"检测到 {keep.sum().item()} 个目标(置信度 > {threshold})")
for box, score, label in zip(boxes[keep], scores[keep], labels[keep]):
    print(f"类别: {COCO_CLASSES[label]}, 置信度: {score:.2f}")

避坑:Faster R-CNN 输入要求图像张量值在 [0, 1] 范围内,且不需要额外归一化。

3.7 图像分割(U-Net)

图像分割是对图像中每个像素进行分类的任务,比目标检测更精细。

3.7.1 语义分割 vs 实例分割

类型说明典型模型
语义分割为每个像素分配类别,同类目标不区分个体FCN、DeepLab、U-Net
实例分割在语义分割基础上区分同类的不同个体Mask R-CNN

3.7.2 U-Net 架构

U-Net 由编码器(下采样)和解码器(上采样)组成,通过跳跃连接将编码器的特征图直接拼接到解码器对应层,保留细节信息。

text
编码器(下采样路径):
  输入 → [Conv×2 → MaxPool] × 4 → Bottleneck

解码器(上采样路径):
  Bottleneck → [ConvTranspose + 跳跃连接拼接 + Conv×2] × 4 → 输出

跳跃连接:将编码器第i层特征图拼接到解码器第(4-i)层,补充空间细节

3.7.3 U-Net PyTorch 实现

python
import torch
import torch.nn as nn

class DoubleConv(nn.Module):
    """U-Net 基础模块:两次卷积 + BN + ReLU"""
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.conv(x)


class UNet(nn.Module):
    def __init__(self, in_channels=3, num_classes=2):
        super(UNet, self).__init__()
        # 编码器(下采样)
        self.enc1 = DoubleConv(in_channels, 64)
        self.enc2 = DoubleConv(64, 128)
        self.enc3 = DoubleConv(128, 256)
        self.enc4 = DoubleConv(256, 512)
        self.pool = nn.MaxPool2d(2)

        # Bottleneck
        self.bottleneck = DoubleConv(512, 1024)

        # 解码器(上采样 + 跳跃连接)
        self.up4 = nn.ConvTranspose2d(1024, 512, 2, stride=2)
        self.dec4 = DoubleConv(1024, 512)  # 512 + 512(跳跃连接)

        self.up3 = nn.ConvTranspose2d(512, 256, 2, stride=2)
        self.dec3 = DoubleConv(512, 256)   # 256 + 256

        self.up2 = nn.ConvTranspose2d(256, 128, 2, stride=2)
        self.dec2 = DoubleConv(256, 128)   # 128 + 128

        self.up1 = nn.ConvTranspose2d(128, 64, 2, stride=2)
        self.dec1 = DoubleConv(128, 64)    # 64 + 64

        # 输出层:1×1 卷积,输出类别数
        self.out_conv = nn.Conv2d(64, num_classes, 1)

    def forward(self, x):
        # 编码
        e1 = self.enc1(x)
        e2 = self.enc2(self.pool(e1))
        e3 = self.enc3(self.pool(e2))
        e4 = self.enc4(self.pool(e3))

        # Bottleneck
        b = self.bottleneck(self.pool(e4))

        # 解码(上采样 + 跳跃连接拼接)
        d4 = self.dec4(torch.cat([self.up4(b), e4], dim=1))
        d3 = self.dec3(torch.cat([self.up3(d4), e3], dim=1))
        d2 = self.dec2(torch.cat([self.up2(d3), e2], dim=1))
        d1 = self.dec1(torch.cat([self.up1(d2), e1], dim=1))

        return self.out_conv(d1)


# 测试 U-Net
model = UNet(in_channels=3, num_classes=2)
x = torch.randn(2, 3, 256, 256)  # batch=2, RGB, 256×256
output = model(x)
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")  # [2, 2, 256, 256]

注意:U-Net 要求输入尺寸能被 16 整除(4次下采样),否则跳跃连接时特征图尺寸不匹配。可在 forward 中使用 F.interpolate 对齐尺寸。

3.8 词嵌入(Word2Vec + GloVe)

词嵌入将离散的词语映射为连续的稠密向量,使语义相近的词在向量空间中距离更近,是 NLP 任务的基础特征表示。

3.8.1 Word2Vec

Word2Vec 通过预测任务学习词向量,有两种训练方式:

方式原理特点
CBOW(Continuous Bag of Words)用上下文词预测中心词训练快,适合大语料
Skip-gram用中心词预测上下文词对低频词效果更好
python
# 安装:pip install gensim
from gensim.models import Word2Vec

# 训练语料(每个句子是词列表)
sentences = [
    ["deep", "learning", "is", "powerful"],
    ["neural", "networks", "learn", "features"],
    ["machine", "learning", "uses", "data"],
    ["deep", "neural", "networks", "are", "deep"],
]

# 训练 Word2Vec 模型
# vector_size: 词向量维度(常用 100/200/300)
# window: 上下文窗口大小(常用 5)
# min_count: 最小词频,低于此值的词被忽略
# workers: 并行训练线程数
# sg: 0=CBOW, 1=Skip-gram
model = Word2Vec(
    sentences,
    vector_size=100,  # 词向量维度
    window=5,         # 上下文窗口
    min_count=1,      # 最小词频
    workers=4,        # 并行线程
    sg=0,             # 0=CBOW, 1=Skip-gram
    epochs=10         # 训练轮数
)

# 获取词向量
vector = model.wv['deep']
print(f"'deep' 词向量形状: {vector.shape}")  # (100,)

# 查找最相似的词
similar_words = model.wv.most_similar('deep', topn=5)
print("与 'deep' 最相似的词:", similar_words)

# 保存和加载模型
model.save('word2vec.model')
loaded_model = Word2Vec.load('word2vec.model')

3.8.2 GloVe 预训练词向量

GloVe(Global Vectors for Word Representation)基于全局词共现矩阵训练,通常直接使用预训练好的词向量。

python
import numpy as np

def load_glove_vectors(glove_file_path):
    """
    加载 GloVe 预训练词向量
    下载地址:https://nlp.stanford.edu/projects/glove/
    常用文件:glove.6B.100d.txt(6B tokens, 100维)
    """
    word_vectors = {}
    with open(glove_file_path, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            vector = np.array(values[1:], dtype=np.float32)
            word_vectors[word] = vector
    return word_vectors

# 使用示例(需要先下载 GloVe 文件)
# glove_vectors = load_glove_vectors('glove.6B.100d.txt')
# print(f"词汇表大小: {len(glove_vectors)}")
# print(f"'king' 词向量形状: {glove_vectors['king'].shape}")

# 模拟 GloVe 词向量(演示用)
vocab = ['king', 'queen', 'man', 'woman', 'deep', 'learning']
embedding_dim = 100
glove_vectors = {word: np.random.randn(embedding_dim).astype(np.float32) for word in vocab}

# 构建 PyTorch Embedding 层(用于模型输入)
import torch
import torch.nn as nn

vocab_size = len(glove_vectors)
embedding_matrix = np.stack(list(glove_vectors.values()))  # [vocab_size, embedding_dim]

# 将预训练词向量加载到 Embedding 层
embedding_layer = nn.Embedding(vocab_size, embedding_dim)
embedding_layer.weight = nn.Parameter(torch.tensor(embedding_matrix))
embedding_layer.weight.requires_grad = False  # 冻结词向量(可选)

print(f"Embedding 层形状: {embedding_layer.weight.shape}")

3.8.3 Word2Vec vs GloVe 对比

维度Word2VecGloVe
训练方式局部上下文窗口预测全局词共现矩阵分解
训练速度较慢
语义捕获局部语义全局统计信息
常用维度100/200/30050/100/200/300
推荐使用场景自定义语料训练直接使用预训练向量

避坑:使用预训练词向量时,注意词汇表中未登录词(OOV)的处理,通常用随机初始化或零向量替代。


4. 进阶模型与技巧

4.1 卷积神经网络(CNN)

4.1.1 CNN基础结构

python
import torch
import torch.nn as nn

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv_layers = nn.Sequential(
            # 第一层卷积
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            # 第二层卷积
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.fc_layers = nn.Sequential(
            nn.Linear(64 * 7 * 7, 128),
            nn.ReLU(),
            nn.Linear(128, 10)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(-1, 64 * 7 * 7)  # 展平
        x = self.fc_layers(x)
        return x

# 测试模型
model = SimpleCNN()
input_tensor = torch.randn(32, 1, 28, 28)  # batch x channels x height x width
output = model(input_tensor)
print(f"输入形状: {input_tensor.shape}")
print(f"输出形状: {output.shape}")

4.1.2 CNN变体对比

模型特点参数数量适用场景
LeNet最早的CNN约60k简单图像识别
AlexNet深层CNN,ReLU约60M大规模图像识别
VGG统一3x3卷积约138M特征提取
ResNet残差连接,解决梯度消失约25M深层网络训练
MobileNet深度可分离卷积约4M移动端部署

4.1.3 ResNet实现

python
import torch
import torch.nn as nn

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)

        # 短路连接
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += self.shortcut(residual)
        out = self.relu(out)
        return out

# ResNet18
class ResNet18(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet18, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self.make_layer(64, 2, stride=1)
        self.layer2 = self.make_layer(128, 2, stride=2)
        self.layer3 = self.make_layer(256, 2, stride=2)
        self.layer4 = self.make_layer(512, 2, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def make_layer(self, out_channels, blocks, stride):
        layers = []
        layers.append(ResidualBlock(self.in_channels, out_channels, stride))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(ResidualBlock(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

4.2 循环神经网络(RNN)

4.2.1 LSTM网络

python
import torch
import torch.nn as nn

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # 初始化隐藏状态
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        # LSTM前向传播
        out, _ = self.lstm(x, (h0, c0))

        # 取最后一个时间步的输出
        out = self.fc(out[:, -1, :])
        return out

# 测试模型
model = LSTMModel(input_size=10, hidden_size=64, num_layers=2, output_size=2)
input_tensor = torch.randn(32, 15, 10)  # batch x seq_len x input_size
output = model(input_tensor)
print(f"输入形状: {input_tensor.shape}")
print(f"输出形状: {output.shape}")

4.2.2 GRU网络

python
import tensorflow as tf
from tensorflow.keras.layers import GRU, Dense

model = tf.keras.Sequential([
    GRU(64, return_sequences=True, input_shape=(None, 10)),
    GRU(32, return_sequences=False),
    Dense(10, activation='softmax')
])

model.summary()

4.3 Transformer架构

4.3.1 自注意力机制

python
import torch
import torch.nn as nn

class SelfAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super(SelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads

        assert self.head_dim * num_heads == embed_dim, "embed_dim must be divisible by num_heads"

        self.query = nn.Linear(embed_dim, embed_dim)
        self.key = nn.Linear(embed_dim, embed_dim)
        self.value = nn.Linear(embed_dim, embed_dim)
        self.out = nn.Linear(embed_dim, embed_dim)

    def forward(self, x, mask=None):
        batch_size, seq_len, embed_dim = x.shape

        # 线性变换
        Q = self.query(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        K = self.key(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        V = self.value(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

        # 计算注意力分数
        scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.head_dim, dtype=torch.float32))

        # 应用掩码(可选)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)

        # 注意力权重
        attn_weights = torch.softmax(scores, dim=-1)

        # 加权求和
        out = torch.matmul(attn_weights, V)
        out = out.transpose(1, 2).contiguous().view(batch_size, seq_len, embed_dim)
        out = self.out(out)

        return out, attn_weights

# 测试
attention = SelfAttention(embed_dim=64, num_heads=4)
x = torch.randn(3, 10, 64)  # batch x seq_len x embed_dim
output, weights = attention(x)
print(f"输出形状: {output.shape}")
print(f"注意力权重形状: {weights.shape}")

4.3.2 Transformer编码器

python
import torch
import torch.nn as nn

class TransformerEncoderLayer(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim, dropout=0.1):
        super(TransformerEncoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

        self.feed_forward = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, embed_dim)
        )

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # 自注意力
        attn_out, _ = self.self_attn(x, x, x)
        x = self.norm1(x + self.dropout(attn_out))

        # 前馈网络
        ff_out = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_out))

        return x

# 创建编码器
encoder = TransformerEncoderLayer(embed_dim=64, num_heads=4, hidden_dim=128)
x = torch.randn(3, 10, 64)
output = encoder(x)
print(f"输出形状: {output.shape}")

4.3.3 BERT实战

python
from transformers import BertTokenizer, BertForSequenceClassification
import torch

# 加载预训练模型
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)

# 输入文本
texts = ["I love this movie!", "This is the worst product ever."]

# 预测
for text in texts:
    inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
    outputs = model(**inputs)
    predictions = torch.argmax(outputs.logits, dim=1)
    sentiment = "positive" if predictions.item() == 1 else "negative"
    print(f"Text: {text} -> Sentiment: {sentiment}")

4.4 模型优化技巧

4.4.1 学习率调度

python
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR, StepLR

# 定义模型(示例)
model = nn.Sequential(nn.Linear(784, 256), nn.ReLU(), nn.Linear(256, 10))

# 创建优化器
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 学习率调度器
scheduler = CosineAnnealingLR(optimizer, T_max=10)
# 或使用StepLR
# scheduler = StepLR(optimizer, step_size=5, gamma=0.1)

# 训练循环
for epoch in range(20):
    # 训练代码...

    # 更新学习率
    scheduler.step()
    current_lr = optimizer.param_groups[0]['lr']
    print(f"Epoch {epoch}, Learning Rate: {current_lr:.6f}")

4.4.2 正则化

python
import torch
import torch.nn as nn
import torch.optim as optim

class RegularizedModel(nn.Module):
    def __init__(self):
        super(RegularizedModel, self).__init__()
        self.fc1 = nn.Linear(784, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 10)
        self.dropout = nn.Dropout(0.5)  # Dropout正则化
        self.bn1 = nn.BatchNorm1d(256)  # BatchNorm

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)  # 应用BatchNorm
        x = nn.ReLU()(x)
        x = self.dropout(x)  # 应用Dropout
        x = self.fc2(x)
        x = nn.ReLU()(x)
        x = self.dropout(x)
        x = self.fc3(x)
        return x

# L2正则化(权重衰减)
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.001)

4.4.3 迁移学习

python
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models

# 加载预训练模型(torchvision >= 0.13 使用 weights 参数)
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)

# 冻结特征提取层
for param in model.parameters():
    param.requires_grad = False

# 替换分类头
num_classes = 10
model.fc = nn.Linear(model.fc.in_features, num_classes)

# 只训练分类头
optimizer = optim.Adam(model.fc.parameters(), lr=0.001)

4.5 生成式模型

4.5.1 GAN基础

python
import torch
import torch.nn as nn
import torch.optim as optim

# 生成器
class Generator(nn.Module):
    def __init__(self, latent_dim, img_channels=1):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(latent_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Linear(512, 784),
            nn.Tanh()
        )

    def forward(self, z):
        img = self.model(z)
        return img.view(-1, 1, 28, 28)

# 判别器
class Discriminator(nn.Module):
    def __init__(self, img_channels=1):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(784, 512),
            nn.LeakyReLU(0.2),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )

    def forward(self, img):
        img_flat = img.view(-1, 784)
        validity = self.model(img_flat)
        return validity

# 初始化模型
latent_dim = 100
generator = Generator(latent_dim)
discriminator = Discriminator()

# 损失函数和优化器
criterion = nn.BCELoss()
optimizer_G = optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))

4.5.2 扩散模型入门

python
from diffusers import StableDiffusionPipeline

# 加载Stable Diffusion模型
pipe = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5")
pipe = pipe.to("cuda")  # 使用GPU加速

# 生成图像
prompt = "a beautiful sunset over the ocean with golden clouds"
image = pipe(prompt, num_inference_steps=50).images[0]

# 保存图像
image.save("sunset.png")
print("图像已保存为 sunset.png")

4.6 变分自编码器(VAE)

VAE(Variational Autoencoder) 是一种生成式模型,通过学习数据的潜在分布来生成新样本。

4.6.1 VAE vs 普通自编码器

维度普通自编码器(AE)变分自编码器(VAE)
隐向量确定性向量 z从分布 N(μ, σ²) 中采样
生成能力弱(插值不连续)强(隐空间连续可插值)
损失函数重建损失重建损失 + KL 散度
应用场景降维、去噪图像生成、数据增强

4.6.2 重参数化技巧

VAE 的核心挑战:采样操作不可微,无法反向传播。

重参数化将随机性从参数中分离:

text
z = μ + σ × ε,其中 ε ~ N(0, 1)

这样梯度可以通过 μ 和 σ 反向传播,而随机性由独立的 ε 提供。

4.6.3 VAE 损失函数

text
L = 重建损失 + KL 散度
重建损失 = BCE(x_reconstructed, x_original)  # 或 MSE
KL 散度 = -0.5 × Σ(1 + log(σ²) - μ² - σ²)

KL 散度约束隐空间接近标准正态分布,使隐空间连续且可采样。

4.6.4 VAE PyTorch 实现

python
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

class VAE(nn.Module):
    def __init__(self, input_dim=784, hidden_dim=400, latent_dim=20):
        super(VAE, self).__init__()
        # 编码器:输入 → 隐层 → 均值 μ 和对数方差 log_var
        self.encoder_fc = nn.Linear(input_dim, hidden_dim)
        self.fc_mu = nn.Linear(hidden_dim, latent_dim)      # 均值
        self.fc_log_var = nn.Linear(hidden_dim, latent_dim)  # 对数方差

        # 解码器:隐向量 z → 隐层 → 重建输出
        self.decoder_fc1 = nn.Linear(latent_dim, hidden_dim)
        self.decoder_fc2 = nn.Linear(hidden_dim, input_dim)

    def encode(self, x):
        """编码:输入 → (μ, log_var)"""
        h = F.relu(self.encoder_fc(x))
        mu = self.fc_mu(h)
        log_var = self.fc_log_var(h)
        return mu, log_var

    def reparameterize(self, mu, log_var):
        """重参数化:z = μ + σ × ε,ε ~ N(0,1)"""
        if self.training:
            std = torch.exp(0.5 * log_var)  # σ = exp(0.5 * log_var)
            eps = torch.randn_like(std)      # ε ~ N(0, 1)
            return mu + std * eps
        return mu  # 推理时直接使用均值

    def decode(self, z):
        """解码:隐向量 z → 重建输出"""
        h = F.relu(self.decoder_fc1(z))
        return torch.sigmoid(self.decoder_fc2(h))  # 输出在 [0,1]

    def forward(self, x):
        mu, log_var = self.encode(x)
        z = self.reparameterize(mu, log_var)
        x_reconstructed = self.decode(z)
        return x_reconstructed, mu, log_var


def vae_loss(x_reconstructed, x_original, mu, log_var):
    """
    VAE 损失 = 重建损失 + KL 散度
    重建损失:BCE(适合图像像素值在[0,1]的情况)
    KL 散度:-0.5 * sum(1 + log_var - mu^2 - exp(log_var))
    """
    # 重建损失(BCE)
    recon_loss = F.binary_cross_entropy(
        x_reconstructed, x_original, reduction='sum'
    )
    # KL 散度(解析解)
    kl_loss = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
    return recon_loss + kl_loss


# 训练示例(使用模拟数据)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = VAE(input_dim=784, hidden_dim=400, latent_dim=20).to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# 模拟 MNIST 数据(实际使用时替换为真实数据集)
for epoch in range(5):
    # 随机生成批次数据(模拟 MNIST 28×28 展平后的 784 维)
    x = torch.rand(64, 784).to(device)

    optimizer.zero_grad()
    x_reconstructed, mu, log_var = model(x)
    loss = vae_loss(x_reconstructed, x, mu, log_var)
    loss.backward()
    optimizer.step()

    print(f"Epoch {epoch+1}, Loss: {loss.item():.2f}")

# 生成新样本(从标准正态分布采样)
model.eval()
with torch.no_grad():
    z = torch.randn(16, 20).to(device)  # 从 N(0,1) 采样 16 个隐向量
    generated = model.decode(z)
    print(f"生成样本形状: {generated.shape}")  # [16, 784]

注意:VAE 的 KL 散度权重(β-VAE 中的 β 参数)会影响生成质量与隐空间结构的平衡。β > 1 时隐空间更规整,但重建质量可能下降。

4.7 超参数调优

超参数(如学习率、网络层数、batch size)无法通过梯度下降学习,需要通过搜索策略找到最优组合。

4.7.1 三种调优方法对比

方法搜索策略适用场景效率
网格搜索穷举所有参数组合参数空间小(≤3个参数)
随机搜索随机采样参数组合参数空间较大
贝叶斯优化(Optuna)基于历史结果智能采样任意场景

4.7.2 网格搜索与随机搜索(sklearn)

python
# 安装:pip install scikit-learn
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.svm import SVC
from sklearn.datasets import make_classification
import numpy as np

# 生成示例数据
X, y = make_classification(n_samples=500, n_features=20, random_state=42)

# 网格搜索:穷举所有参数组合
param_grid = {
    'C': [0.1, 1, 10],          # 正则化参数
    'kernel': ['rbf', 'linear'], # 核函数类型
    'gamma': ['scale', 'auto']   # 核函数系数
}

grid_search = GridSearchCV(
    SVC(),
    param_grid,
    cv=5,           # 5折交叉验证
    scoring='accuracy',
    n_jobs=-1       # 使用所有CPU核心
)
grid_search.fit(X, y)
print(f"网格搜索最优参数: {grid_search.best_params_}")
print(f"最优交叉验证准确率: {grid_search.best_score_:.4f}")

# 随机搜索:随机采样参数组合(更高效)
param_dist = {
    'C': np.logspace(-2, 2, 100),    # 对数均匀分布
    'kernel': ['rbf', 'linear', 'poly'],
    'gamma': np.logspace(-4, 0, 100)
}

random_search = RandomizedSearchCV(
    SVC(),
    param_dist,
    n_iter=20,      # 随机采样20次(远少于网格搜索的300次)
    cv=5,
    scoring='accuracy',
    random_state=42,
    n_jobs=-1
)
random_search.fit(X, y)
print(f"随机搜索最优参数: {random_search.best_params_}")
print(f"最优交叉验证准确率: {random_search.best_score_:.4f}")

4.7.3 贝叶斯优化(Optuna)

python
# 安装:pip install optuna
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np

# 准备数据
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
scaler = StandardScaler()
X = scaler.fit_transform(X)
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.long)
X_val_t = torch.tensor(X_val, dtype=torch.float32)
y_val_t = torch.tensor(y_val, dtype=torch.long)

def objective(trial):
    """Optuna 目标函数:返回验证集准确率(越高越好)"""
    # 定义超参数搜索空间
    lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)  # 对数均匀分布
    n_layers = trial.suggest_int('n_layers', 1, 4)          # 整数
    hidden_size = trial.suggest_categorical('hidden_size', [64, 128, 256, 512])
    dropout = trial.suggest_float('dropout', 0.1, 0.5)

    # 构建模型
    layers = []
    in_size = 20
    for _ in range(n_layers):
        layers.extend([nn.Linear(in_size, hidden_size), nn.ReLU(), nn.Dropout(dropout)])
        in_size = hidden_size
    layers.append(nn.Linear(in_size, 2))
    model = nn.Sequential(*layers)

    # 训练
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    model.train()
    for _ in range(20):
        optimizer.zero_grad()
        output = model(X_train_t)
        loss = criterion(output, y_train_t)
        loss.backward()
        optimizer.step()

    # 验证
    model.eval()
    with torch.no_grad():
        val_output = model(X_val_t)
        val_acc = (val_output.argmax(1) == y_val_t).float().mean().item()

    return val_acc

# 创建 study 并优化(direction='maximize' 表示最大化目标值)
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=30, show_progress_bar=True)

print(f"最优超参数: {study.best_params}")
print(f"最优验证准确率: {study.best_value:.4f}")

避坑:超参数调优时务必使用验证集(而非测试集)评估,避免数据泄露。Optuna 支持剪枝(Pruning)提前终止无效试验,可大幅加速搜索。

4.8 半监督学习与小样本学习

4.8.1 半监督学习

半监督学习利用少量有标签数据和大量无标签数据进行训练,适用于标注成本高的场景(如医疗影像、法律文本)。

伪标签(Pseudo Label)方法

  1. 用少量有标签数据训练初始模型
  2. 对无标签数据进行预测,置信度高于阈值的预测作为伪标签
  3. 将伪标签数据加入训练集,重新训练(可迭代多轮)
python
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# 模拟有标签和无标签数据
torch.manual_seed(42)
X_labeled = torch.randn(100, 20)    # 100 个有标签样本
y_labeled = torch.randint(0, 2, (100,))
X_unlabeled = torch.randn(900, 20)  # 900 个无标签样本

# 定义简单分类模型
model = nn.Sequential(
    nn.Linear(20, 64), nn.ReLU(),
    nn.Linear(64, 32), nn.ReLU(),
    nn.Linear(32, 2)
)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

# 第一阶段:用有标签数据训练初始模型
model.train()
for epoch in range(50):
    optimizer.zero_grad()
    output = model(X_labeled)
    loss = criterion(output, y_labeled)
    loss.backward()
    optimizer.step()

# 第二阶段:生成伪标签
model.eval()
with torch.no_grad():
    unlabeled_output = model(X_unlabeled)
    probs = F.softmax(unlabeled_output, dim=1)
    confidence, pseudo_labels = probs.max(dim=1)

# 过滤高置信度样本(阈值 0.9)
threshold = 0.9
high_conf_mask = confidence > threshold
X_pseudo = X_unlabeled[high_conf_mask]
y_pseudo = pseudo_labels[high_conf_mask]
print(f"高置信度伪标签样本数: {high_conf_mask.sum().item()}")

# 第三阶段:合并有标签数据和伪标签数据重新训练
X_combined = torch.cat([X_labeled, X_pseudo], dim=0)
y_combined = torch.cat([y_labeled, y_pseudo], dim=0)

model.train()
for epoch in range(50):
    optimizer.zero_grad()
    output = model(X_combined)
    loss = criterion(output, y_combined)
    loss.backward()
    optimizer.step()

print("半监督训练完成")

4.8.2 小样本学习(Few-Shot Learning)

小样本学习在极少量样本(每类 1-5 个)的情况下完成分类,常用于新类别快速识别。

核心概念

  • N-way K-shot:N 个类别,每类 K 个支持样本
  • 支持集(Support Set):用于定义类别的少量样本
  • 查询集(Query Set):需要分类的样本

Prototypical Network(原型网络)

python
import torch
import torch.nn as nn
import torch.nn.functional as F

class ProtoNet(nn.Module):
    """原型网络:计算类别原型,通过距离分类"""
    def __init__(self, input_dim=20, embed_dim=64):
        super(ProtoNet, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128), nn.ReLU(),
            nn.Linear(128, embed_dim)
        )

    def forward(self, support, query, n_way, k_shot):
        """
        support: [n_way * k_shot, input_dim] 支持集
        query:   [n_query, input_dim] 查询集
        """
        # 编码支持集和查询集
        support_embed = self.encoder(support)  # [n_way*k_shot, embed_dim]
        query_embed = self.encoder(query)       # [n_query, embed_dim]

        # 计算每个类别的原型(支持集样本的均值)
        support_embed = support_embed.view(n_way, k_shot, -1)
        prototypes = support_embed.mean(dim=1)  # [n_way, embed_dim]

        # 计算查询样本到各原型的欧氏距离
        dists = torch.cdist(query_embed, prototypes)  # [n_query, n_way]

        # 距离越小,概率越大(取负距离做 softmax)
        log_probs = F.log_softmax(-dists, dim=1)
        return log_probs


# 测试原型网络(5-way 3-shot)
n_way, k_shot = 5, 3
model = ProtoNet(input_dim=20, embed_dim=64)

support = torch.randn(n_way * k_shot, 20)  # 支持集:15个样本
query = torch.randn(10, 20)                 # 查询集:10个样本
query_labels = torch.randint(0, n_way, (10,))

log_probs = model(support, query, n_way, k_shot)
predictions = log_probs.argmax(dim=1)
accuracy = (predictions == query_labels).float().mean()
print(f"5-way 3-shot 准确率: {accuracy.item():.4f}")

小样本学习应用场景

  • 医疗影像:罕见病诊断(每类样本极少)
  • 工业缺陷检测:新型缺陷快速识别
  • 新类别识别:电商新品类快速分类

避坑:小样本学习的评估需要使用 episode 方式(每次随机采样 N-way K-shot 任务),而非传统的 train/test 划分,否则评估结果不可靠。

4.9 多任务学习

多任务学习(Multi-Task Learning) 同时训练多个相关任务,共享底层特征表示,相互促进提升泛化能力。

优势

  • 共享表示减少过拟合(相当于正则化)
  • 辅助任务提供额外监督信号
  • 减少推理时的计算开销(一次前向传播完成多个任务)

4.9.1 共享编码器架构

python
import torch
import torch.nn as nn
import torch.optim as optim

class MultiTaskModel(nn.Module):
    """
    共享编码器 + 多任务头
    示例:同时完成分类任务A和回归任务B
    """
    def __init__(self, input_dim=20, shared_dim=64, num_classes=5, reg_output=1):
        super(MultiTaskModel, self).__init__()
        # 共享编码器(Shared Backbone)
        self.shared_encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, shared_dim),
            nn.ReLU()
        )
        # 任务A头:分类任务
        self.task_a_head = nn.Sequential(
            nn.Linear(shared_dim, 32),
            nn.ReLU(),
            nn.Linear(32, num_classes)
        )
        # 任务B头:回归任务
        self.task_b_head = nn.Sequential(
            nn.Linear(shared_dim, 32),
            nn.ReLU(),
            nn.Linear(32, reg_output)
        )

    def forward(self, x):
        shared_features = self.shared_encoder(x)
        output_a = self.task_a_head(shared_features)  # 分类输出
        output_b = self.task_b_head(shared_features)  # 回归输出
        return output_a, output_b


# 多任务训练示例
model = MultiTaskModel(input_dim=20, shared_dim=64, num_classes=5)
optimizer = optim.Adam(model.parameters(), lr=1e-3)

criterion_cls = nn.CrossEntropyLoss()  # 分类损失
criterion_reg = nn.MSELoss()           # 回归损失

# 固定权重的多任务损失
w_a, w_b = 1.0, 0.5  # 任务权重(根据任务重要性调整)

for epoch in range(10):
    x = torch.randn(32, 20)
    y_cls = torch.randint(0, 5, (32,))    # 分类标签
    y_reg = torch.randn(32, 1)             # 回归标签

    optimizer.zero_grad()
    out_a, out_b = model(x)

    loss_a = criterion_cls(out_a, y_cls)
    loss_b = criterion_reg(out_b, y_reg)
    total_loss = w_a * loss_a + w_b * loss_b  # 加权多任务损失

    total_loss.backward()
    optimizer.step()

    if epoch % 5 == 0:
        print(f"Epoch {epoch}: Total={total_loss.item():.4f}, "
              f"Cls={loss_a.item():.4f}, Reg={loss_b.item():.4f}")

4.9.2 可学习任务权重(Uncertainty Weighting)

python
import torch
import torch.nn as nn

class UncertaintyWeighting(nn.Module):
    """
    基于不确定性的自动任务权重(Kendall et al., 2018)
    每个任务的权重由可学习的对数方差 log_sigma 决定
    Loss = sum_i(1/(2*sigma_i^2) * L_i + log(sigma_i))
    """
    def __init__(self, n_tasks=2):
        super(UncertaintyWeighting, self).__init__()
        # 可学习的对数方差参数(初始化为0,即 sigma=1)
        self.log_sigma = nn.Parameter(torch.zeros(n_tasks))

    def forward(self, losses):
        """
        losses: list of task losses [L_1, L_2, ...]
        """
        total_loss = 0
        for i, loss in enumerate(losses):
            # 精度 = 1 / (2 * sigma^2) = exp(-log_sigma^2)
            precision = torch.exp(-self.log_sigma[i])
            total_loss += precision * loss + self.log_sigma[i]
        return total_loss

# 使用示例
uw = UncertaintyWeighting(n_tasks=2)
loss_a = torch.tensor(0.5)
loss_b = torch.tensor(2.0)
total = uw([loss_a, loss_b])
print(f"不确定性加权总损失: {total.item():.4f}")
print(f"任务权重 log_sigma: {uw.log_sigma.data}")

注意:多任务学习中,任务之间的梯度可能相互干扰(负迁移)。当任务相关性低时,建议使用梯度裁剪或 GradNorm 等方法平衡梯度。

4.10 高维数据处理与降维

高维数据(特征维度 > 100)面临维度灾难,降维可以去除冗余特征、加速训练、便于可视化。

4.10.1 三种降维方法对比

方法速度保留结构适用场景是否可逆
PCA全局线性结构预处理、特征压缩是(近似)
t-SNE局部非线性结构2D/3D 可视化
UMAP局部+全局结构可视化 + 下游任务

4.10.2 PCA 降维

python
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.datasets import make_classification

# 生成高维数据(100维)
X, y = make_classification(n_samples=500, n_features=100, n_informative=10, random_state=42)

# PCA 降维到 2 维
# n_components: 保留的主成分数量(也可设为 0.95 保留 95% 方差)
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X)

print(f"原始维度: {X.shape[1]}")
print(f"降维后维度: {X_pca.shape[1]}")
print(f"保留方差比例: {pca.explained_variance_ratio_.sum():.4f}")

# 可视化
plt.figure(figsize=(8, 6))
plt.scatter(X_pca[:, 0], X_pca[:, 1], c=y, cmap='viridis', alpha=0.6)
plt.title('PCA 降维可视化')
plt.xlabel('PC1')
plt.ylabel('PC2')
plt.colorbar()
plt.show()

4.10.3 t-SNE 可视化

python
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
from sklearn.datasets import load_digits

# 加载手写数字数据集(64维)
digits = load_digits()
X, y = digits.data, digits.target

# t-SNE 降维到 2 维
# perplexity: 近邻数量(5-50),影响局部结构保留程度
# n_iter: 迭代次数(建议 ≥ 1000)
tsne = TSNE(n_components=2, perplexity=30, n_iter=1000, random_state=42)
X_tsne = tsne.fit_transform(X)

# 可视化
plt.figure(figsize=(10, 8))
scatter = plt.scatter(X_tsne[:, 0], X_tsne[:, 1], c=y, cmap='tab10', alpha=0.7)
plt.colorbar(scatter)
plt.title('t-SNE 手写数字可视化')
plt.show()

4.10.4 UMAP 降维

python
# 安装:pip install umap-learn
import umap
import matplotlib.pyplot as plt
from sklearn.datasets import load_digits

digits = load_digits()
X, y = digits.data, digits.target

# UMAP 降维
# n_components: 目标维度
# n_neighbors: 近邻数量(影响局部/全局结构平衡,默认 15)
# min_dist: 嵌入点的最小距离(0.0-1.0,越小聚类越紧密)
reducer = umap.UMAP(n_components=2, n_neighbors=15, min_dist=0.1, random_state=42)
X_umap = reducer.fit_transform(X)

plt.figure(figsize=(10, 8))
scatter = plt.scatter(X_umap[:, 0], X_umap[:, 1], c=y, cmap='tab10', alpha=0.7)
plt.colorbar(scatter)
plt.title('UMAP 手写数字可视化')
plt.show()

避坑:t-SNE 和 UMAP 的结果受随机种子影响,不同运行结果可能不同。生产环境中务必固定 random_state。t-SNE 不适合用于下游任务的特征提取(不可逆),UMAP 可以。


5. 实战场景

5.1 图像分类实战

python
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# 数据预处理
transform = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# 加载数据集
train_dataset = datasets.CIFAR10('./data', train=True, download=True, transform=transform)
test_dataset = datasets.CIFAR10('./data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# CNN模型
class CIFARCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(CIFARCNN, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.fc_layers = nn.Sequential(
            nn.Linear(128 * 4 * 4, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(-1, 128 * 4 * 4)
        x = self.fc_layers(x)
        return x

# 训练配置
model = CIFARCNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# 训练循环
def train(model, train_loader, criterion, optimizer, device, epochs=10):
    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)

            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        avg_loss = running_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

train(model, train_loader, criterion, optimizer, device)

# 评估
def evaluate(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    print(f"测试准确率: {100 * correct / total:.2f}%")

evaluate(model, test_loader, device)

5.2 文本情感分析

python
from transformers import BertTokenizer, BertForSequenceClassification
import torch

# 加载预训练模型
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)
model.eval()

# 情感分析函数
def analyze_sentiment(text):
    inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
    with torch.no_grad():
        outputs = model(**inputs)
    predictions = torch.argmax(outputs.logits, dim=1)
    return "positive" if predictions.item() == 1 else "negative"

# 测试
texts = [
    "This movie is absolutely amazing! I loved every minute of it.",
    "Terrible experience, will not recommend to anyone.",
    "The product is okay, not great but not bad either."
]

for text in texts:
    sentiment = analyze_sentiment(text)
    print(f"Text: {text}")
    print(f"Sentiment: {sentiment}\n")

5.3 时间序列预测

python
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

# 生成模拟数据
def generate_data(n_samples=1000):
    t = np.linspace(0, 100, n_samples)
    data = np.sin(t * 0.5) + np.sin(t * 1.0) + np.random.normal(0, 0.1, n_samples)
    return data

data = generate_data()

# 数据预处理
def create_sequences(data, seq_len=50):
    X, y = [], []
    for i in range(len(data) - seq_len):
        X.append(data[i:i+seq_len])
        y.append(data[i+seq_len])
    return np.array(X), np.array(y)

seq_len = 50
X, y = create_sequences(data, seq_len)

# 划分数据集
split = int(0.8 * len(X))
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]

# 转换为张量
X_train = torch.tensor(X_train, dtype=torch.float32).unsqueeze(-1)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32).unsqueeze(-1)
y_test = torch.tensor(y_test, dtype=torch.float32)

# LSTM模型
class TimeSeriesLSTM(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=2):
        super(TimeSeriesLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])
        return out

# 训练模型
model = TimeSeriesLSTM()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(100):
    optimizer.zero_grad()
    output = model(X_train)
    loss = criterion(output.squeeze(), y_train)
    loss.backward()
    optimizer.step()

    if epoch % 20 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item():.4f}")

# 预测
model.eval()
with torch.no_grad():
    predictions = model(X_test).squeeze().numpy()

# 可视化
plt.figure(figsize=(12, 6))
plt.plot(y_test.numpy(), label='Actual')
plt.plot(predictions, label='Predicted')
plt.legend()
plt.title('Time Series Prediction')
plt.show()

5.4 模型部署基础

python
import torch
import torch.nn as nn
import onnx

# 定义简单模型
class SimpleModel(nn.Module):
    def __init__(self):
        super(SimpleModel, self).__init__()
        self.fc = nn.Linear(10, 2)

    def forward(self, x):
        return self.fc(x)

# 训练并保存模型
model = SimpleModel()
torch.save(model.state_dict(), 'model.pth')

# 加载模型
model.load_state_dict(torch.load('model.pth'))
model.eval()

# 导出为ONNX格式
dummy_input = torch.randn(1, 10)
torch.onnx.export(model, dummy_input, 'model.onnx', verbose=True)

# 验证ONNX模型
onnx_model = onnx.load('model.onnx')
onnx.checker.check_model(onnx_model)
print("ONNX模型验证通过!")

# 简单API封装示例
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class InputData(BaseModel):
    data: list[float]

@app.post("/predict")
async def predict(input_data: InputData):
    tensor = torch.tensor(input_data.data, dtype=torch.float32).unsqueeze(0)
    with torch.no_grad():
        output = model(tensor)
    prediction = torch.argmax(output, dim=1).item()
    return {"prediction": prediction}

6. 高级进阶

6.1 自定义组件

6.1.1 自定义网络层

python
import torch
import torch.nn as nn

class CustomConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0):
        super(CustomConv2d, self).__init__()
        self.weight = nn.Parameter(
            torch.randn(out_channels, in_channels, kernel_size, kernel_size) * 0.01
        )
        self.bias = nn.Parameter(torch.zeros(out_channels))
        self.stride = stride
        self.padding = padding

    def forward(self, x):
        return nn.functional.conv2d(
            x, self.weight, self.bias, stride=self.stride, padding=self.padding
        )

# 使用自定义层
layer = CustomConv2d(3, 16, 3, padding=1)
input_tensor = torch.randn(1, 3, 28, 28)
output = layer(input_tensor)
print(f"输出形状: {output.shape}")

6.1.2 自定义损失函数

python
import torch
import torch.nn as nn

class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        BCE_loss = nn.functional.binary_cross_entropy_with_logits(inputs, targets, reduction='none')
        pt = torch.exp(-BCE_loss)
        F_loss = self.alpha * (1 - pt) ** self.gamma * BCE_loss

        if self.reduction == 'mean':
            return torch.mean(F_loss)
        elif self.reduction == 'sum':
            return torch.sum(F_loss)
        else:
            return F_loss

# 使用Focal Loss
loss_fn = FocalLoss(alpha=1, gamma=2)
inputs = torch.randn(32, 1)
targets = torch.randint(0, 2, (32, 1)).float()
loss = loss_fn(inputs, targets)
print(f"Focal Loss: {loss.item():.4f}")

6.1.3 自定义数据集

python
import torch
from torch.utils.data import Dataset, DataLoader

class CustomDataset(Dataset):
    def __init__(self, data, labels, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        label = self.labels[idx]

        if self.transform:
            sample = self.transform(sample)

        return sample, label

# 创建数据集
data = torch.randn(100, 3, 28, 28)
labels = torch.randint(0, 10, (100,))

dataset = CustomDataset(data, labels)
dataloader = DataLoader(dataset, batch_size=10, shuffle=True)

# 遍历数据
for batch_data, batch_labels in dataloader:
    print(f"批次数据形状: {batch_data.shape}")
    print(f"批次标签形状: {batch_labels.shape}")
    break

6.2 复杂场景处理

6.2.1 不平衡数据集处理

python
import torch
import torch.nn as nn
from torch.utils.data import WeightedRandomSampler

# 假设我们有不平衡数据
labels = torch.tensor([0]*100 + [1]*10)  # 100个类别0,10个类别1

# 计算类别权重
class_counts = torch.bincount(labels)
weights = 1.0 / class_counts.float()
sample_weights = weights[labels]

# 创建加权采样器
sampler = WeightedRandomSampler(
    weights=sample_weights,
    num_samples=len(sample_weights),
    replacement=True
)

# 使用采样器
dataloader = DataLoader(dataset, batch_size=10, sampler=sampler)

6.2.2 模型压缩

python
import os
import torch
import torch.nn as nn
from torchvision import models

# 加载预训练模型(torchvision >= 0.13 使用 weights 参数)
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)

# 量化模型
quantized_model = torch.quantization.quantize_dynamic(
    model, {nn.Linear}, dtype=torch.qint8
)

# 比较模型大小
import os
torch.save(model.state_dict(), 'resnet18_full.pth')
torch.save(quantized_model.state_dict(), 'resnet18_quantized.pth')

print(f"完整模型大小: {os.path.getsize('resnet18_full.pth') / 1024 / 1024:.2f} MB")
print(f"量化模型大小: {os.path.getsize('resnet18_quantized.pth') / 1024 / 1024:.2f} MB")

6.3 深度学习工程化

6.3.1 批量处理

python
import os
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset

class LargeDataset(Dataset):
    def __init__(self, data_path):
        self.data_path = data_path
        # 只加载文件列表,不加载数据
        self.file_list = os.listdir(data_path)

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        # 按需加载数据
        file_path = os.path.join(self.data_path, self.file_list[idx])
        data = np.load(file_path)
        return torch.tensor(data, dtype=torch.float32)

# 使用多进程加载
dataloader = DataLoader(
    LargeDataset('data/'),
    batch_size=32,
    num_workers=4,  # 多进程
    pin_memory=True  # 锁页内存
)

6.3.2 GPU加速优化

python
import torch

# 检查GPU可用性
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 将模型和数据移到GPU
model = model.to(device)
data = data.to(device)
target = target.to(device)

# 使用混合精度训练
scaler = torch.cuda.amp.GradScaler()

for epoch in range(epochs):
    optimizer.zero_grad()

    with torch.cuda.amp.autocast():
        output = model(data)
        loss = criterion(output, target)

    scaler.scale(loss).backward()
    scaler.step(optimizer)
    scaler.update()

6.4 TorchScript 模型部署

TorchScript 将 PyTorch 模型转换为可在无 Python 环境中运行的静态图,适用于 C++ 部署、移动端和生产环境。

6.4.1 script vs trace 对比

特性torch.jit.scripttorch.jit.trace
原理静态分析 Python 代码记录张量操作轨迹
支持控制流是(if/for/while)否(固化为静态图)
适用场景含条件分支的模型纯前向传播模型
使用方式@torch.jit.script 装饰器torch.jit.trace(model, input)

6.4.2 torch.jit.script 示例

python
import torch
import torch.nn as nn

# 使用 @torch.jit.script 装饰器(支持控制流)
@torch.jit.script
def scripted_function(x: torch.Tensor, threshold: float) -> torch.Tensor:
    """含条件分支的函数,script 可正确处理"""
    if x.mean() > threshold:
        return x * 2.0
    else:
        return x * 0.5

# 测试
x = torch.randn(3, 4)
result = scripted_function(x, 0.0)
print(f"输出形状: {result.shape}")

# 对整个模型使用 script
class ScriptableModel(nn.Module):
    def __init__(self, input_dim: int, output_dim: int):
        super(ScriptableModel, self).__init__()
        self.fc = nn.Linear(input_dim, output_dim)
        self.threshold = 0.5

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        out = self.fc(x)
        # 含条件分支,需要使用 script
        if out.mean() > self.threshold:
            return torch.relu(out)
        return out

model = ScriptableModel(10, 5)
scripted_model = torch.jit.script(model)

# 保存 TorchScript 模型
scripted_model.save('scripted_model.pt')
print("TorchScript 模型已保存")

6.4.3 torch.jit.trace 示例

python
import torch
import torch.nn as nn

# 定义简单模型(无控制流,适合 trace)
class SimpleNet(nn.Module):
    def __init__(self):
        super(SimpleNet, self).__init__()
        self.fc1 = nn.Linear(10, 64)
        self.fc2 = nn.Linear(64, 2)

    def forward(self, x):
        return self.fc2(torch.relu(self.fc1(x)))

model = SimpleNet()
model.eval()

# 提供示例输入(trace 通过记录该输入的操作轨迹生成静态图)
example_input = torch.randn(1, 10)
traced_model = torch.jit.trace(model, example_input)

# 验证输出一致性
with torch.no_grad():
    original_output = model(example_input)
    traced_output = traced_model(example_input)
    print(f"原始输出: {original_output}")
    print(f"Traced输出: {traced_output}")
    print(f"输出一致: {torch.allclose(original_output, traced_output)}")

# 保存和加载 TorchScript 模型
traced_model.save('traced_model.pt')
loaded_model = torch.jit.load('traced_model.pt')
print("TorchScript 模型加载成功")

避坑torch.jit.trace 不能处理含 if/for 的动态控制流,这些分支会被固化为 trace 时的执行路径。含动态逻辑的模型必须使用 torch.jit.script


6.5 自定义优化器

通过继承 torch.optim.Optimizer 实现自定义优化算法,满足特殊训练需求。

6.5.1 自定义优化器实现

python
import torch
from torch.optim import Optimizer

class SGDWithMomentumCustom(Optimizer):
    """
    自定义 SGD + Momentum 优化器(演示继承 Optimizer 的标准方式)
    参数:
        params: 模型参数
        lr: 学习率(默认 0.01)
        momentum: 动量系数(默认 0.9),加速收敛
        weight_decay: L2 正则化系数(默认 0)
    """
    def __init__(self, params, lr=0.01, momentum=0.9, weight_decay=0.0):
        defaults = dict(lr=lr, momentum=momentum, weight_decay=weight_decay)
        super(SGDWithMomentumCustom, self).__init__(params, defaults)

    def step(self, closure=None):
        """执行一步参数更新"""
        loss = None
        if closure is not None:
            loss = closure()

        for group in self.param_groups:
            lr = group['lr']
            momentum = group['momentum']
            weight_decay = group['weight_decay']

            for p in group['params']:
                if p.grad is None:
                    continue

                grad = p.grad.data

                # L2 正则化
                if weight_decay != 0:
                    grad = grad.add(p.data, alpha=weight_decay)

                # 动量更新(使用 state 存储历史速度)
                param_state = self.state[p]
                if 'velocity' not in param_state:
                    param_state['velocity'] = torch.zeros_like(p.data)

                velocity = param_state['velocity']
                velocity.mul_(momentum).add_(grad)  # v = momentum * v + grad

                # 参数更新(使用 no_grad 避免追踪梯度)
                with torch.no_grad():
                    p.data.add_(velocity, alpha=-lr)  # θ = θ - lr * v

        return loss


# 使用自定义优化器
import torch.nn as nn

model = nn.Sequential(nn.Linear(10, 64), nn.ReLU(), nn.Linear(64, 2))
optimizer = SGDWithMomentumCustom(
    model.parameters(),
    lr=0.01,        # 学习率
    momentum=0.9,   # 动量系数(0.9 是常用值)
    weight_decay=1e-4  # L2 正则化
)

# 训练循环
for step in range(5):
    x = torch.randn(32, 10)
    y = torch.randint(0, 2, (32,))

    optimizer.zero_grad()
    output = model(x)
    loss = nn.CrossEntropyLoss()(output, y)
    loss.backward()
    optimizer.step()

    print(f"Step {step+1}, Loss: {loss.item():.4f}")

注意:自定义优化器中,参数更新必须在 torch.no_grad() 上下文中进行,否则会被 autograd 追踪,导致内存泄漏。


6.6 模型监控(TensorBoard + wandb)

训练过程监控是工程化的重要环节,帮助及时发现训练异常、对比实验结果。

6.6.1 TensorBoard 使用

python
# 安装:pip install tensorboard
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter

# 创建 SummaryWriter(日志保存到 ./runs 目录)
writer = SummaryWriter(log_dir='./runs/experiment_1')

# 定义简单模型
model = nn.Sequential(nn.Linear(10, 64), nn.ReLU(), nn.Linear(64, 2))
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

# 记录模型结构
dummy_input = torch.randn(1, 10)
writer.add_graph(model, dummy_input)

# 训练循环(记录指标)
for epoch in range(20):
    x = torch.randn(64, 10)
    y = torch.randint(0, 2, (64,))

    optimizer.zero_grad()
    output = model(x)
    loss = criterion(output, y)
    loss.backward()
    optimizer.step()

    # 记录标量(损失、准确率)
    acc = (output.argmax(1) == y).float().mean()
    writer.add_scalar('Loss/train', loss.item(), epoch)
    writer.add_scalar('Accuracy/train', acc.item(), epoch)

    # 记录权重分布(每5个epoch)
    if epoch % 5 == 0:
        for name, param in model.named_parameters():
            writer.add_histogram(f'weights/{name}', param, epoch)

writer.close()
# 启动 TensorBoard:tensorboard --logdir=./runs
print("TensorBoard 日志已保存,运行: tensorboard --logdir=./runs")

6.6.2 wandb 使用

python
# 安装:pip install wandb
# 首次使用需要登录:wandb login
import wandb
import torch
import torch.nn as nn
import torch.optim as optim

# 初始化 wandb 实验
wandb.init(
    project="deep-learning-demo",  # 项目名称
    name="experiment_1",           # 实验名称
    config={                       # 超参数配置(自动记录)
        "learning_rate": 1e-3,
        "epochs": 20,
        "batch_size": 64,
        "architecture": "MLP"
    }
)

model = nn.Sequential(nn.Linear(10, 64), nn.ReLU(), nn.Linear(64, 2))
optimizer = optim.Adam(model.parameters(), lr=wandb.config.learning_rate)
criterion = nn.CrossEntropyLoss()

# 监控模型梯度和权重(可选)
wandb.watch(model, log='all', log_freq=10)

for epoch in range(wandb.config.epochs):
    x = torch.randn(wandb.config.batch_size, 10)
    y = torch.randint(0, 2, (wandb.config.batch_size,))

    optimizer.zero_grad()
    output = model(x)
    loss = criterion(output, y)
    loss.backward()
    optimizer.step()

    acc = (output.argmax(1) == y).float().mean()

    # 记录指标(自动上传到 wandb 云端)
    wandb.log({
        "epoch": epoch,
        "train/loss": loss.item(),
        "train/accuracy": acc.item()
    })

# 保存模型到 wandb
torch.save(model.state_dict(), 'model.pth')
wandb.save('model.pth')
wandb.finish()
print("实验已记录到 wandb")

6.6.3 TensorBoard vs wandb 对比

特性TensorBoardwandb
部署方式本地云端
实验对比有限强大(多实验)
超参数追踪手动自动
协作功能
免费使用完全免费有免费额度
离线使用支持需要网络

6.7 数据 Pipeline 工程化

高效的数据 Pipeline 是训练速度的关键,合理配置可将 GPU 利用率从 30% 提升到 90%+。

6.7.1 完整数据变换链

python
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import torch

# 训练集变换(含数据增强)
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),           # 先放大
    transforms.RandomCrop(224),              # 随机裁剪到目标尺寸
    transforms.RandomHorizontalFlip(p=0.5),  # 50% 概率水平翻转
    transforms.RandomRotation(15),           # 随机旋转 ±15°
    transforms.ColorJitter(                  # 颜色抖动
        brightness=0.3,
        contrast=0.3,
        saturation=0.3,
        hue=0.1
    ),
    transforms.ToTensor(),                   # 转为张量 [0,1]
    transforms.Normalize(                    # ImageNet 均值/标准差
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

# 验证/测试集变换(不含随机增强)
val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

6.7.2 DataLoader 性能优化配置

python
from torch.utils.data import DataLoader
import multiprocessing

# 获取 CPU 核心数
num_cpus = multiprocessing.cpu_count()

# 高性能 DataLoader 配置
train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=64,
    shuffle=True,
    num_workers=min(4, num_cpus // 2),  # 并行加载进程数(建议 CPU核心数/2)
    pin_memory=True,                     # 锁页内存,加速 CPU→GPU 数据传输
    prefetch_factor=2,                   # 每个 worker 预取的批次数(默认 2)
    persistent_workers=True,             # 保持 worker 进程存活,避免重复创建开销
    drop_last=True                       # 丢弃最后不完整的批次(保持 batch size 一致)
)

# 参数说明:
# num_workers=0: 主进程加载(调试用)
# num_workers=4: 4个子进程并行加载(生产推荐)
# pin_memory=True: 仅在使用 GPU 时有效,CPU 训练无需开启
# persistent_workers=True: 需要 num_workers > 0

6.7.3 自定义 Dataset 最佳实践

python
import os
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision import transforms

class ImageFolderDataset(Dataset):
    """
    自定义图像数据集
    目录结构:
        root/
            class_a/img1.jpg, img2.jpg, ...
            class_b/img1.jpg, img2.jpg, ...
    """
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.samples = []  # [(image_path, label), ...]
        self.classes = sorted(os.listdir(root_dir))
        self.class_to_idx = {cls: i for i, cls in enumerate(self.classes)}

        # 构建样本列表(只存路径,不加载图像)
        for cls in self.classes:
            cls_dir = os.path.join(root_dir, cls)
            if os.path.isdir(cls_dir):
                for img_name in os.listdir(cls_dir):
                    if img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
                        self.samples.append(
                            (os.path.join(cls_dir, img_name), self.class_to_idx[cls])
                        )

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        """按需加载图像(懒加载,节省内存)"""
        img_path, label = self.samples[idx]
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, label


# 使用示例(使用模拟数据演示)
# 实际使用时替换 root_dir 为真实数据集路径
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

# dataset = ImageFolderDataset(root_dir='./data/train', transform=transform)
# loader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)

# 演示:使用随机张量模拟
class MockDataset(Dataset):
    def __init__(self, size=100):
        self.data = torch.randn(size, 3, 224, 224)
        self.labels = torch.randint(0, 10, (size,))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

mock_dataset = MockDataset(100)
mock_loader = DataLoader(mock_dataset, batch_size=16, shuffle=True)

for batch_x, batch_y in mock_loader:
    print(f"批次数据形状: {batch_x.shape}, 标签形状: {batch_y.shape}")
    break

避坑:Windows 系统使用 num_workers > 0 时,DataLoader 代码必须放在 if __name__ == '__main__': 块中,否则会触发多进程错误。Linux/Mac 无此限制。


7. 核心工具与资源

7.1 常用工具手册

PyTorch常用操作

操作代码示例说明
创建张量torch.tensor([1, 2, 3])创建PyTorch张量
张量运算a + b, a @ b元素级运算、矩阵乘法
自动求导x.requires_grad = True启用梯度追踪
模型保存torch.save(model.state_dict(), 'model.pth')保存模型权重
模型加载model.load_state_dict(torch.load('model.pth'))加载模型权重

TensorFlow常用操作

操作代码示例说明
创建张量tf.constant([1, 2, 3])创建TensorFlow张量
构建模型tf.keras.Sequential()序列模型
训练模型model.fit(x, y, epochs=10)训练模型
保存模型model.save('model.h5')保存完整模型
加载模型tf.keras.models.load_model('model.h5')加载模型

7.2 优质实战模板

通用训练模板

python
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10, device='cpu'):
    model.to(device)

    for epoch in range(epochs):
        # 训练阶段
        model.train()
        train_loss = 0.0

        for data, target in train_loader:
            data, target = data.to(device), target.to(device)

            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * data.size(0)

        train_loss /= len(train_loader.dataset)

        # 验证阶段
        model.eval()
        val_loss = 0.0
        correct = 0

        with torch.no_grad():
            for data, target in val_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                val_loss += criterion(output, target).item() * data.size(0)
                _, predicted = torch.max(output, 1)
                correct += (predicted == target).sum().item()

        val_loss /= len(val_loader.dataset)
        val_acc = correct / len(val_loader.dataset)

        print(f"Epoch {epoch+1}/{epochs}")
        print(f"Train Loss: {train_loss:.4f}")
        print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}\n")

7.3 学习资源

经典书籍

  • 《深度学习》- Ian Goodfellow
  • 《动手学深度学习》- 李沐
  • 《Python深度学习》- Francois Chollet

在线教程

数据集推荐

  • MNIST: 手写数字数据集
  • CIFAR-10/100: 图像分类数据集
  • IMDB: 电影评论情感分析数据集
  • COCO: 目标检测/分割数据集
  • GLUE: NLP基准数据集

开源项目


8. 常见问题与避坑指南

8.1 高频问题

问题1: 模型训练不收敛

错误表现:损失函数始终很高,不下降

原因分析

  • 学习率设置不当
  • 数据预处理不充分
  • 模型结构设计不合理

解决方案

python
# 调整学习率
optimizer = optim.Adam(model.parameters(), lr=1e-4)  # 尝试更小的学习率

# 添加学习率调度器
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)

# 确保数据标准化
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)

问题2: 梯度消失/爆炸

错误表现:梯度变为0或无穷大

解决方案

python
# 使用梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

# 使用残差连接(ResNet)
class ResidualBlock(nn.Module):
    def forward(self, x):
        residual = x
        # ... 网络层 ...
        out += residual  # 残差连接
        return out

# 使用ReLU替代Sigmoid/Tanh
activation = nn.ReLU()

问题3: GPU内存不足

错误表现RuntimeError: CUDA out of memory

解决方案

python
# 减小batch size
dataloader = DataLoader(dataset, batch_size=16)  # 从32减小到16

# 使用混合精度训练
scaler = torch.cuda.amp.GradScaler()

with torch.cuda.amp.autocast():
    output = model(data)
    loss = criterion(output, target)

# 释放不必要的张量
del intermediate_tensor
torch.cuda.empty_cache()

问题4: 过拟合

错误表现:训练准确率高,测试准确率低

解决方案

python
# 使用Dropout
model = nn.Sequential(
    nn.Linear(784, 256),
    nn.ReLU(),
    nn.Dropout(0.5),  # 添加Dropout
    nn.Linear(256, 10)
)

# 使用L2正则化
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)

# 增加训练数据(数据增强)
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor()
])

问题5: 预训练模型微调无效

错误表现:使用预训练模型微调后,验证集准确率没有提升,甚至比从头训练更差

原因分析

  • 学习率过大,覆盖了预训练权重中的有效特征
  • 未解冻合适的层,只训练了分类头而特征提取层未适配新任务
  • 数据分布与预训练数据差异过大

解决方案

python
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models

# 加载预训练模型
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)

# 策略1:分层学习率(底层用小学习率,顶层用大学习率)
# 冻结前两个 layer,只微调后两个 layer 和分类头
for name, param in model.named_parameters():
    if 'layer1' in name or 'layer2' in name:
        param.requires_grad = False  # 冻结底层

model.fc = nn.Linear(model.fc.in_features, 10)

# 分层设置学习率
optimizer = optim.Adam([
    {'params': model.layer3.parameters(), 'lr': 1e-4},  # 中层:小学习率
    {'params': model.layer4.parameters(), 'lr': 1e-4},
    {'params': model.fc.parameters(), 'lr': 1e-3}        # 分类头:大学习率
])

# 策略2:逐步解冻(先训练分类头,再逐步解冻更多层)
# 第一阶段:只训练分类头(5 epochs)
for param in model.parameters():
    param.requires_grad = False
model.fc.weight.requires_grad = True
model.fc.bias.requires_grad = True

# 第二阶段:解冻 layer4(再训练 5 epochs)
for param in model.layer4.parameters():
    param.requires_grad = True

问题6: 数据量不足

错误表现:训练集样本数量少(每类 < 100 张),模型严重过拟合,测试集准确率极低

原因分析

  • 小数据集无法支撑深层网络的参数量
  • 模型记住了训练样本而非学习通用特征

解决方案

python
import torch
from torchvision import transforms, models
import torch.nn as nn

# 方案1:强力数据增强(扩充有效训练样本)
aggressive_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomCrop(224),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.2),
    transforms.RandomRotation(30),
    transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.1),
    transforms.RandomGrayscale(p=0.1),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# 方案2:迁移学习(使用预训练特征,减少对数据量的依赖)
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
for param in model.parameters():
    param.requires_grad = False  # 冻结所有层
model.fc = nn.Linear(model.fc.in_features, num_classes)  # 只训练分类头

# 方案3:使用更小的模型(减少参数量,降低过拟合风险)
small_model = nn.Sequential(
    nn.Linear(input_dim, 32),  # 隐藏层节点数减少
    nn.ReLU(),
    nn.Dropout(0.5),           # 强 Dropout
    nn.Linear(32, num_classes)
)

问题7: 部署兼容性问题

错误表现:模型在训练环境运行正常,但部署到生产环境后报错或结果不一致

原因分析

  • PyTorch/CUDA 版本不一致(训练环境 vs 生产环境)
  • Python 版本差异导致序列化/反序列化问题
  • 模型依赖特定硬件(如 GPU),但生产环境为 CPU

解决方案

python
import torch
import torch.nn as nn

# 方案1:锁定依赖版本(生成 requirements.txt)
# pip freeze > requirements.txt
# 关键依赖示例:
# torch==2.1.0
# torchvision==0.16.0
# numpy==1.24.3

# 方案2:导出为 ONNX(跨平台、跨框架部署)
class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc = nn.Linear(10, 2)
    def forward(self, x):
        return self.fc(x)

model = SimpleModel()
model.eval()

dummy_input = torch.randn(1, 10)
torch.onnx.export(
    model,
    dummy_input,
    'model.onnx',
    opset_version=11,           # 指定 ONNX opset 版本(推荐 11+)
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'}}  # 支持动态 batch size
)
print("ONNX 模型导出成功,可在任意支持 ONNX 的环境中运行")

# 方案3:使用 TorchScript 导出(无需 Python 环境)
scripted = torch.jit.script(model)
scripted.save('model_scripted.pt')
# 加载时无需原始模型定义
loaded = torch.jit.load('model_scripted.pt')

8.2 高级避坑技巧

数据泄露防范

python
# 错误做法:在整个数据集上进行标准化
scaler = StandardScaler()
X = scaler.fit_transform(X)  # ❌ 测试数据参与了拟合
X_train, X_test = train_test_split(X)

# 正确做法:只在训练数据上拟合
X_train, X_test = train_test_split(data)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)  # ✅ 只拟合训练数据
X_test = scaler.transform(X_test)        # ✅ 用训练数据的参数转换

模型偏见处理

python
# 检查数据分布
import pandas as pd
df = pd.DataFrame({'label': labels, 'group': groups})
print(df.groupby('group')['label'].mean())

# 使用公平性指标
from sklearn.metrics import balanced_accuracy_score
balanced_acc = balanced_accuracy_score(y_true, y_pred)

# 尝试重新加权
sample_weights = compute_fairness_weights(groups)
loss = criterion(output, target) * sample_weights

9. 实战案例汇总

案例1: MNIST手写数字识别

python
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# 数据预处理
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# 加载数据
train_data = datasets.MNIST('./data', train=True, download=True, transform=transform)
test_data = datasets.MNIST('./data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

# 定义模型
class MNISTNet(nn.Module):
    def __init__(self):
        super(MNISTNet, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(1, 32, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.fc_layers = nn.Sequential(
            nn.Linear(64 * 7 * 7, 128),
            nn.ReLU(),
            nn.Linear(128, 10)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(-1, 64 * 7 * 7)
        x = self.fc_layers(x)
        return x

# 训练配置
model = MNISTNet()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# 训练
for epoch in range(5):
    model.train()
    for data, target in train_loader:
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

    # 评估
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            _, predicted = torch.max(output, 1)
            correct += (predicted == target).sum().item()

    print(f"Epoch {epoch+1}, Test Accuracy: {100 * correct / len(test_data):.2f}%")

预期结果与分析

  • 训练 5 个 epoch 后,测试准确率通常可达 98%~99%
  • MNIST 是深度学习入门基准数据集,CNN 在此任务上表现优异
  • 若准确率低于 95%,检查数据归一化是否正确(均值 0.1307,标准差 0.3081)

案例2: 文本分类(BERT)

python
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset

# 加载数据集
dataset = load_dataset('imdb')

# 加载预训练模型和分词器
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)

# 预处理函数
def preprocess_function(examples):
    return tokenizer(examples['text'], padding='max_length', truncation=True, max_length=128)

# 应用预处理
encoded_dataset = dataset.map(preprocess_function, batched=True)

# 训练配置
training_args = TrainingArguments(
    output_dir='./results',
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    evaluation_strategy='epoch',
    logging_dir='./logs',
)

# 创建Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=encoded_dataset['train'],
    eval_dataset=encoded_dataset['test'],
)

# 训练
trainer.train()

# 评估
results = trainer.evaluate()
print(f"评估结果: {results}")

预期结果与分析

  • BERT 在 IMDB 情感分类任务上,微调 3 个 epoch 后准确率通常可达 92%~94%
  • 相比传统 LSTM 方法(约 85%),BERT 提升显著
  • 若准确率偏低,尝试降低学习率至 2e-5 或增加 warmup steps

案例3: 图像生成(DCGAN)

python
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt

# 数据加载
transform = transforms.Compose([
    transforms.Resize(64),
    transforms.CenterCrop(64),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

dataset = datasets.CelebA(root='./data', split='train', download=True, transform=transform)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# 生成器
class Generator(nn.Module):
    def __init__(self, latent_dim=100):
        super(Generator, self).__init__()
        self.main = nn.Sequential(
            nn.ConvTranspose2d(latent_dim, 512, 4, 1, 0, bias=False),
            nn.BatchNorm2d(512),
            nn.ReLU(True),

            nn.ConvTranspose2d(512, 256, 4, 2, 1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(True),

            nn.ConvTranspose2d(256, 128, 4, 2, 1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(True),

            nn.ConvTranspose2d(128, 64, 4, 2, 1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(True),

            nn.ConvTranspose2d(64, 3, 4, 2, 1, bias=False),
            nn.Tanh()
        )

    def forward(self, x):
        return self.main(x)

# 判别器
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.main = nn.Sequential(
            nn.Conv2d(3, 64, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(64, 128, 4, 2, 1, bias=False),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(128, 256, 4, 2, 1, bias=False),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(256, 512, 4, 2, 1, bias=False),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(512, 1, 4, 1, 0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.main(x).view(-1, 1).squeeze(1)

# 初始化模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
generator = Generator().to(device)
discriminator = Discriminator().to(device)

# 损失函数和优化器
criterion = nn.BCELoss()
optimizer_G = optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))

# 训练循环
fixed_noise = torch.randn(64, 100, 1, 1, device=device)

for epoch in range(50):
    for i, (real_imgs, _) in enumerate(dataloader):
        batch_size = real_imgs.size(0)
        real_imgs = real_imgs.to(device)

        # 训练判别器
        optimizer_D.zero_grad()

        # 真实图像
        real_labels = torch.ones(batch_size, device=device)
        output = discriminator(real_imgs)
        loss_D_real = criterion(output, real_labels)
        loss_D_real.backward()

        # 生成图像
        noise = torch.randn(batch_size, 100, 1, 1, device=device)
        fake_imgs = generator(noise)
        fake_labels = torch.zeros(batch_size, device=device)
        output = discriminator(fake_imgs.detach())
        loss_D_fake = criterion(output, fake_labels)
        loss_D_fake.backward()

        optimizer_D.step()

        # 训练生成器
        optimizer_G.zero_grad()
        output = discriminator(fake_imgs)
        loss_G = criterion(output, real_labels)
        loss_G.backward()
        optimizer_G.step()

        if i % 100 == 0:
            print(f"Epoch [{epoch}/{50}] Batch [{i}/{len(dataloader)}]")
            print(f"Loss D: {loss_D_real.item() + loss_D_fake.item():.4f}, Loss G: {loss_G.item():.4f}")

# 保存生成的图像
with torch.no_grad():
    fake_imgs = generator(fixed_noise)
    plt.figure(figsize=(8, 8))
    for j in range(fake_imgs.size(0)):
        plt.subplot(8, 8, j+1)
        img = fake_imgs[j].cpu().detach().numpy()
        img = (img + 1) / 2  # 反归一化
        plt.imshow(img.transpose(1, 2, 0))
        plt.axis('off')
    plt.savefig('dcgan_output.png')
    plt.show()

预期结果与分析

  • DCGAN 训练 50 个 epoch 后,生成的人脸图像应具有基本的人脸结构(眼睛、鼻子、嘴巴可辨认)
  • 判别器损失(Loss D)应稳定在 0.5~1.0 之间,生成器损失(Loss G)在 1.0~3.0 之间
  • 若生成图像出现模式崩溃(所有图像相似),尝试降低生成器学习率或使用 Wasserstein GAN

案例4: 目标检测(YOLOv5)

python
import torch

# 加载YOLOv5模型
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)

# 图像检测
img_path = 'test.jpg'
results = model(img_path)

# 显示结果
results.print()
results.show()

# 获取检测结果
predictions = results.xyxy[0]  # [x1, y1, x2, y2, confidence, class]
for pred in predictions:
    x1, y1, x2, y2, conf, cls = pred
    print(f"类别: {model.names[int(cls)]}, 置信度: {conf:.2f}")

预期结果与分析

  • YOLOv5s 在 COCO 数据集上的预训练 mAP@0.5 约为 56.8%
  • 推理速度:GPU(V100)约 6ms/张,CPU 约 100ms/张
  • 置信度阈值建议设为 0.25~0.5,过高会漏检,过低会误检

案例5: 命名实体识别(BERT)

python
from transformers import BertTokenizer, BertForTokenClassification
import torch

# 加载预训练模型
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForTokenClassification.from_pretrained('dbmdz/bert-large-cased-finetuned-conll03-english')

# 标签映射
label_list = ['O', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC', 'B-MISC', 'I-MISC']

# 输入文本
text = "Apple Inc. is located in Cupertino, California."

# 预处理
inputs = tokenizer(text, return_tensors='pt')

# 预测
with torch.no_grad():
    outputs = model(**inputs)
    predictions = torch.argmax(outputs.logits, dim=2)

# 输出结果
tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
for token, pred in zip(tokens, predictions[0].numpy()):
    print(f"{token}: {label_list[pred]}")

预期结果与分析

  • dbmdz/bert-large-cased-finetuned-conll03-english 在 CoNLL-2003 NER 任务上 F1 约为 92.4%
  • 对于 "Apple Inc. is located in Cupertino, California." 这句话,预期识别出:
    • Apple Inc. → ORG(组织)
    • Cupertino → LOC(地点)
    • California → LOC(地点)

实战案例对比总览

案例任务类型使用模型数据集预期指标
案例1图像分类CNN(自定义)MNIST准确率 98%~99%
案例2文本分类BERTIMDB准确率 92%~94%
案例3图像生成DCGANCelebALoss D: 0.5~1.0
案例4目标检测YOLOv5sCOCOmAP@0.5: 56.8%
案例5命名实体识别BERT-largeCoNLL-2003F1: 92.4%