算法可视化与交互学习平台

LoRA 领域助手:微调一个本地 1.5B~3B 小模型LoRA Domain Assistant: Fine-Tune a Local 1.5B-3B Model

第三阶段从 TinyGPT 走向真正可用的本地领域助手:选择用户本地已下载的 1.5B~3B instruct 模型,用 LoRA 或 QLoRA 只训练少量 adapter 参数,通过本地 HTTP runner 调用 GPU 优先训练、保存 checkpoint、加载当前 adapter 推理,并用领域问答评测从 loss 走向可用性。

LLMAdvancedFree
KernelGPU
9

第三阶段:从小模型实验走向领域助手

前 8 个模块已经走完三条线索:

next-token 原理 -> 从零训练 TinyGPT -> task-to-code TinyCodeGPT -> tool-calling TinyGPT

模块 9 进入第三阶段:不再从零训练一个玩具模型,而是在用户本机已经下载好的开源 instruct 模型上做 **LoRA / QLoRA 微调**,目标是得到一个真正有用的领域助手。

本模块默认领域是 **AlgoLab 机器学习课程助教**:它要理解前面模块里的 TinyGPT、Transformer、VAE、Diffusion、Tool Calling、GPU 本地训练、checkpoint、loss 和评测问题。这个领域足够窄,可以本地训练;又足够真实,可以回答用户实际会问的问题。

完整闭环仍然延续 No.7 和 No.8 的工程形状:

本地模型路径 -> JSONL SFT 数据 -> 本地 HTTP runner -> GPU 优先训练 LoRA adapter -> 当前 adapter 推理 -> base vs LoRA 对比 -> 领域评测
领域

领域助手先学哪些领域

第一版不要追求通用聊天。LoRA 的优势是把一个已有 instruct 模型对齐到稳定、窄而高频的任务分布。对 AlgoLab 来说,最合适的领域不是百科知识,而是课程内的解释、诊断和训练建议。

领域模型要学什么典型用户问题
机器学习课程解释把 Transformer、VAE、Diffusion、LoRA、Tool Calling 讲成可学习的步骤为什么 next-token 能产生工具调用能力?
本地训练诊断解释 loss、过拟合、batch、grad accumulation、GPU fallbackCUDA 可用但显存不足应该怎么调?
实验流程助手指导用户准备 JSONL、选择 checkpoint、比较 base 和 adapter为什么要做 train/val split?
代码与 runner 使用解释本地 HTTP runner、Python 环境、模型路径和 adapter 目录runner 已连接但模型加载失败怎么办?
评测与可用性区分低 loss、格式正确、答案有用、事实可靠为什么低 loss 不等于领域助手好用?
第一版领域边界:AlgoLab course assistant 学稳定术语、解释风格、诊断流程和回答格式 不要把大量易变事实硬塞进 LoRA,后续交给 RAG
模型

模型选择:必须来自用户本地已下载模型

本模块不在 runner 中自动下载模型。训练时只读取用户本地硬盘上的 Hugging Face 格式模型目录,例如包含 config.json、tokenizer 文件和 *.safetensors 权重文件的目录。

建议模型适用场景第一版建议
Qwen2.5-1.5B-Instruct中文课程助教、通用解释、训练诊断优先。轻量、稳定,适合先打通 LoRA 闭环。
Qwen3-1.7B后续想扩展 agent、推理和工具使用可作为第二选择。注意 thinking 模式会让提示模板更复杂。
Qwen2.5-Coder-1.5B-Instruct代码解释、runner 调试、数据处理脚本如果领域更偏代码,优先它。
Llama-3.2-3B-Instruct英文资料、3B 级别对话质量显存压力更高,许可证也需要单独确认。

实际工程上,最保守的顺序是:先用 1.5B 模型和 20 到 200 条样本验证流程,再增加数据量、epoch 和模型尺寸。

公式

LoRA 的核心:冻结大模型,只训练低秩增量

LoRA 的核心不是重新训练整个大模型,而是在某些线性层旁边加一个很小的低秩“补丁”。原始权重 冻结不动,只训练 ,让模型在少量可训练参数里学到领域方向。

预训练模型中被冻结的原始线性层权重
frozen pretrained linear weight
领域微调学到的权重增量
domain-specific weight update
两个可训练低秩矩阵,组合后近似完整的权重变化
two trainable low-rank matrices approximating the full update
LoRA rank,控制 adapter 容量
LoRA rank controlling adapter capacity
LoRA 缩放系数
LoRA scaling factor
线性层输入维度和输出维度
input and output dimensions of the linear layer

为什么不是全量微调

全量微调会更新大模型中的大量权重,还要保存梯度、优化器状态和训练激活。1.5B 到 3B 模型即使权重本身能加载,训练内存也会明显放大。LoRA 的思路是:预训练模型已经有通用语言能力,领域微调只需要给它一个小的方向修正。

Full fine-tuning: update all pretrained weights store gradients + optimizer states + activations LoRA: freeze base model train only small adapter matrices save adapter instead of full model copy

从一个线性层看参数量

假设某个 Transformer 线性层是 。如果直接学习完整的 ,需要约 1677 万个参数;如果 LoRA rank 取 ,只需要两个细矩阵,参数量约 6.5 万,单层约少 256 倍。

为什么叫低秩

低秩假设是:领域适配并不需要在完整高维空间里任意修改模型。对于 AlgoLab 助教,真正需要改变的是回答风格、课程术语、诊断流程和少量任务偏好。这些变化可以被限制在较少的方向上,rank 就是在控制这些可学习方向的数量。

Base model: large general capability space Domain shift: a small set of useful directions LoRA rank r: how many directions the adapter can learn

训练和推理时发生什么

训练时,base model 的 不更新,只有 LoRA adapter 的 更新;训练结束保存的是 adapter 文件。推理时,需要同时加载 base model 和 adapter,模型前向计算等价于使用

Training: W.requires_grad = False A.requires_grad = True B.requires_grad = True save adapter_config + adapter weights Inference: local base model + selected LoRA adapter forward pass uses W + (alpha / r) * B @ A

LoRA 能做什么,不能做什么

LoRA 很适合学习稳定行为:回答格式、领域术语、诊断步骤、工具使用习惯。它不适合把大量易变事实硬背进权重。生产级领域助手通常让 LoRA 学行为,让 RAG 或工具提供最新资料、日志和外部事实。

LoRA learns: style + procedure + domain preference RAG/tools provide: fresh facts + long documents + user-specific state
SFT

训练目标:assistant-only next-token loss

微调仍然是 next-token 预测,但只在 assistant 回答 token 上计算 loss。system 和 user 是条件上下文,不要求模型复述它们。

assistant 回答 token 的位置集合
positions belonging to the assistant response
当前位置的目标 token
target token at position t
LoRA adapter 中被训练的参数
trainable LoRA adapter parameters

和模块 8 的关系

Tool-Calling TinyGPT 训练 CALL 文本;这里训练领域回答文本。两者本质都是在条件上下文后最大化目标 token 序列概率。

JSONL

JSONL 数据格式:messages 对话样本

jsonl
{"messages":[{"role":"system","content":"你是 AlgoLab 机器学习课程助教。回答要准确、分步骤、优先解释直觉,再给必要公式或本地训练建议。不要编造实验结果。"},{"role":"user","content":"为什么 LoRA 微调适合做 AlgoLab 课程助教?"},{"role":"assistant","content":"LoRA 适合把已有 instruct 模型对齐到稳定的课程领域。它冻结基座模型,只训练少量 adapter 参数,因此本地显存压力较小,迭代速度快。对于 AlgoLab,LoRA 可以学习课程术语、回答结构、训练诊断流程和评测口径。"}]}
{"messages":[{"role":"system","content":"你是 AlgoLab 机器学习课程助教。"},{"role":"user","content":"训练 loss 下降但验证 loss 上升,应该怎么判断?"},{"role":"assistant","content":"这通常是过拟合信号。先确认 train/val split 是否合理,再看数据量、训练 epoch、学习率和 LoRA rank。可以减少 epoch、降低 rank、增加数据多样性,或加入更严格的验证集。"}]}
样本

一条样本如何进入 LoRA 训练

读取 JSONL 对话
sample = json.loads(line)
messages = sample['messages']
Initial Variables
system
你是 AlgoLab 机器学习课程助教。
user
为什么要做 assistant-only loss?
assistant
SFT 的目标是让模型在 system/user 上下文后生成 assistant 回答。
Step 1 Variables
messages
system + user + assistant
Step 1 / 4
Runner

本地 HTTP runner 架构

和 No.7、No.8 一样,浏览器页面不直接训练模型,而是通过 HTTP 调用本机 Python runner。这样前端只负责控制台和可视化,真正的 PyTorch、Transformers、PEFT、CUDA 都运行在用户自己的 Python 环境中。

Browser UI POST /train, /generate, /evaluate Local HTTP runner PyTorch + Transformers + PEFT User local model directory config.json + tokenizer + safetensors LoRA adapter output .tmp/lora_domain_assistant/adapters/...
接口用途
GET /status检查 PyTorch、CUDA、Transformers、PEFT、bitsandbytes 和 adapter 列表。
POST /models/inspect验证本地模型目录是否包含必要文件。
POST /train启动 LoRA 或 QLoRA 训练任务。
GET /jobs/{id}轮询 step、epoch、loss、percent 和日志。
POST /generate加载当前 adapter 或手动选择 adapter 后生成回答。
POST /evaluate用一组领域 prompt 做基础可用性评测。
命令

安装依赖并启动本地 LoRA runner

powershell
# 1. 先在用户本地 Python 环境安装 LoRA 训练依赖
& 'C:\Users\richi\TI_richiebao\LLM\.venv\Scripts\python.exe' -m pip install -U transformers peft accelerate safetensors sentencepiece

# 2. 如果要尝试 QLoRA,再安装 bitsandbytes。Windows 环境需确认 wheel 是否支持当前 CUDA/Python。
& 'C:\Users\richi\TI_richiebao\LLM\.venv\Scripts\python.exe' -m pip install -U bitsandbytes

# 3. 在 AlgoLab 项目根目录启动 runner
& 'C:\Users\richi\TI_richiebao\LLM\.venv\Scripts\python.exe' scripts\lora_domain_assistant_local_runner.py --host 127.0.0.1 --port 4890 --device auto

# 如果用户环境里 python 已经指向正确 venv,也可以执行
python scripts\lora_domain_assistant_local_runner.py --host 127.0.0.1 --port 4890 --device auto

# GPU 策略
# --device auto  : 优先 CUDA,没有再回 CPU
# --device cuda  : 要求 CUDA,适合确认 GPU 环境
# --device cpu   : 明确只用 CPU
源码

本地 runner 源码:lora_domain_assistant_local_runner.py

python
from __future__ import annotations

import argparse
import gc
import importlib
import json
import math
import random
import sys
import threading
import time
import traceback
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.parse import urlparse

try:
    import torch
except Exception:  # pragma: no cover - status endpoint reports this clearly.
    torch = None

try:
    from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
except Exception:  # pragma: no cover
    AutoModelForCausalLM = None
    AutoTokenizer = None
    BitsAndBytesConfig = None

try:
    from peft import LoraConfig, PeftModel, get_peft_model, prepare_model_for_kbit_training
except Exception:  # pragma: no cover
    LoraConfig = None
    PeftModel = None
    get_peft_model = None
    prepare_model_for_kbit_training = None


ROOT_DIR = Path(__file__).resolve().parents[1]
RUN_DIR = ROOT_DIR / ".tmp" / "lora_domain_assistant"
ADAPTER_DIR = RUN_DIR / "adapters"
DATASET_DIR = RUN_DIR / "datasets"
RUN_DIR.mkdir(parents=True, exist_ok=True)
ADAPTER_DIR.mkdir(parents=True, exist_ok=True)
DATASET_DIR.mkdir(parents=True, exist_ok=True)

JOBS: dict[str, dict[str, Any]] = {}
STOP_EVENTS: dict[str, threading.Event] = {}
ACTIVE_JOB_ID: str | None = None
LOCK = threading.Lock()
DEVICE_POLICY = "auto"

DOMAIN_KEYWORDS = [
    "LoRA",
    "adapter",
    "loss",
    "TinyGPT",
    "VAE",
    "diffusion",
    "Tool-Calling",
    "checkpoint",
    "GPU",
    "assistant-only",
]


@dataclass
class SFTSample:
    system: str
    user: str
    assistant: str


def utc_now() -> str:
    return datetime.now(timezone.utc).isoformat()


def package_version(name: str) -> str | None:
    try:
        module = importlib.import_module(name)
    except Exception:
        return None
    return str(getattr(module, "__version__", "installed"))


def has_package(name: str) -> bool:
    return package_version(name) is not None


def torch_cuda_version() -> str | None:
    if torch is None:
        return None
    return getattr(getattr(torch, "version", None), "cuda", None)


def is_cuda_available() -> bool:
    return bool(torch is not None and torch.cuda.is_available())


def cuda_unavailable_reason() -> str:
    if torch is None:
        return "PyTorch is not installed."
    version = str(getattr(torch, "__version__", ""))
    if "+cpu" in version:
        return "The current PyTorch wheel is CPU-only."
    if torch_cuda_version() is None:
        return "The current PyTorch build does not report CUDA support."
    return "CUDA is not available to PyTorch in this environment."


def resolve_device(requested_policy: str | None = None) -> tuple[str, str]:
    policy = str(requested_policy or DEVICE_POLICY or "auto").strip().lower()
    if policy in {"gpu", "cuda"}:
        policy = "auto"
    if policy == "cpu":
        return "cpu", "CPU was selected explicitly."
    if is_cuda_available():
        name = torch.cuda.get_device_name(0)
        return "cuda", f"GPU priority: using CUDA device {name}."
    return "cpu", f"GPU priority: CUDA unavailable, falling back to CPU. {cuda_unavailable_reason()}"


def respond(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any]) -> None:
    raw = json.dumps(payload, ensure_ascii=False, default=str).encode("utf-8")
    handler.send_response(status)
    handler.send_header("Content-Type", "application/json; charset=utf-8")
    handler.send_header("Content-Length", str(len(raw)))
    handler.send_header("Access-Control-Allow-Origin", "*")
    handler.send_header("Access-Control-Allow-Headers", "Content-Type")
    handler.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
    handler.end_headers()
    handler.wfile.write(raw)


def read_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]:
    length = int(handler.headers.get("Content-Length") or "0")
    if length <= 0:
        return {}
    raw = handler.rfile.read(length).decode("utf-8")
    return json.loads(raw or "{}")


def update_job(job_id: str, **patch: Any) -> None:
    with LOCK:
        JOBS[job_id].update(patch)


def append_log(job_id: str, line: str) -> None:
    with LOCK:
        logs = JOBS[job_id].setdefault("logs", [])
        logs.append(line)
        if len(logs) > 240:
            del logs[: len(logs) - 240]


def append_metric(job_id: str, metric: dict[str, Any]) -> None:
    with LOCK:
        metrics = JOBS[job_id].setdefault("metrics", [])
        metrics.append(metric)
        if len(metrics) > 240:
            del metrics[: len(metrics) - 240]


def update_progress(job_id: str, step: int, max_steps: int, target_epochs: float) -> None:
    epoch = (step / max(max_steps, 1)) * max(target_epochs, 0.0)
    update_job(
        job_id,
        progress={
            "step": step,
            "max_steps": max_steps,
            "epoch": round(epoch, 4),
            "target_epochs": target_epochs,
            "percent": round(step / max(max_steps, 1), 6),
        },
    )


def clamp_int(value: Any, fallback: int, lower: int, upper: int) -> int:
    try:
        parsed = int(value)
    except (TypeError, ValueError):
        parsed = fallback
    return max(lower, min(parsed, upper))


def clamp_float(value: Any, fallback: float, lower: float, upper: float) -> float:
    try:
        parsed = float(value)
    except (TypeError, ValueError):
        parsed = fallback
    return max(lower, min(parsed, upper))


def inspect_model_path(model_path: str) -> dict[str, Any]:
    path = Path(model_path).expanduser()
    if not path.exists() or not path.is_dir():
        raise ValueError("Model path does not exist or is not a directory.")
    files = {item.name for item in path.iterdir() if item.is_file()}
    has_config = "config.json" in files
    has_tokenizer = any(name in files for name in ["tokenizer.json", "tokenizer.model", "vocab.json"])
    has_weights = any(name.endswith((".safetensors", ".bin", ".pt")) for name in files) or any(path.glob("*.safetensors"))
    config_payload = {}
    if has_config:
        try:
            config_payload = json.loads((path / "config.json").read_text(encoding="utf-8"))
        except Exception:
            config_payload = {}
    if not has_config:
        raise ValueError("config.json not found in model directory.")
    if not has_tokenizer:
        raise ValueError("Tokenizer files not found in model directory.")
    if not has_weights:
        raise ValueError("Model weight files not found in model directory.")
    return {
        "ok": True,
        "path": str(path),
        "model_type": config_payload.get("model_type"),
        "architectures": config_payload.get("architectures"),
        "vocab_size": config_payload.get("vocab_size"),
        "hidden_size": config_payload.get("hidden_size"),
        "num_hidden_layers": config_payload.get("num_hidden_layers"),
    }


def normalize_sft_sample(value: Any, line_number: int) -> SFTSample:
    if not isinstance(value, dict):
        raise ValueError(f"line {line_number}: expected JSON object")
    messages = value.get("messages")
    if not isinstance(messages, list):
        raise ValueError(f"line {line_number}: messages must be a list")
    system = ""
    user = ""
    assistant = ""
    for item in messages:
        if not isinstance(item, dict):
            continue
        role = str(item.get("role") or "")
        content = str(item.get("content") or "").strip()
        if role == "system" and content and not system:
            system = content
        elif role == "user" and content:
            user = content
        elif role == "assistant" and content:
            assistant = content
    if not user or not assistant:
        raise ValueError(f"line {line_number}: each sample needs user and assistant messages")
    return SFTSample(system=system, user=user, assistant=assistant)


def parse_sft_jsonl(raw: str) -> list[SFTSample]:
    samples: list[SFTSample] = []
    for index, line in enumerate(raw.strip().splitlines(), start=1):
        stripped = line.strip()
        if not stripped:
            continue
        samples.append(normalize_sft_sample(json.loads(stripped), index))
    if not samples:
        raise ValueError("dataset_jsonl is empty")
    return samples


def render_prompt(tokenizer: Any, sample: SFTSample) -> str:
    messages = []
    if sample.system:
        messages.append({"role": "system", "content": sample.system})
    messages.append({"role": "user", "content": sample.user})
    if getattr(tokenizer, "chat_template", None):
        return tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    prefix = f"System: {sample.system}\n" if sample.system else ""
    return f"{prefix}User: {sample.user}\nAssistant: "


def encode_sample(tokenizer: Any, sample: SFTSample, max_seq_len: int) -> dict[str, Any]:
    prompt = render_prompt(tokenizer, sample)
    eos = tokenizer.eos_token or ""
    full = f"{prompt}{sample.assistant}{eos}"
    prompt_ids = tokenizer(prompt, add_special_tokens=False).input_ids
    full_ids = tokenizer(full, add_special_tokens=False, truncation=True, max_length=max_seq_len).input_ids
    if len(full_ids) < 2:
        raise ValueError("Encoded sample is too short")
    labels = list(full_ids)
    mask_to = min(len(prompt_ids), len(labels))
    labels[:mask_to] = [-100] * mask_to
    if all(label == -100 for label in labels):
        labels[-1] = full_ids[-1]
    return {"input_ids": full_ids, "labels": labels}


def collate_batch(items: list[dict[str, Any]], pad_token_id: int, device: str) -> dict[str, Any]:
    max_len = max(len(item["input_ids"]) for item in items)
    input_ids = []
    labels = []
    attention_mask = []
    for item in items:
        pad = max_len - len(item["input_ids"])
        input_ids.append(item["input_ids"] + [pad_token_id] * pad)
        labels.append(item["labels"] + [-100] * pad)
        attention_mask.append([1] * len(item["input_ids"]) + [0] * pad)
    return {
        "input_ids": torch.tensor(input_ids, dtype=torch.long, device=device),
        "labels": torch.tensor(labels, dtype=torch.long, device=device),
        "attention_mask": torch.tensor(attention_mask, dtype=torch.long, device=device),
    }


def evaluate_loss(model: Any, dataset: list[dict[str, Any]], pad_token_id: int, batch_size: int, device: str) -> float:
    if not dataset:
        return float("nan")
    model.eval()
    losses = []
    with torch.no_grad():
        for item in dataset[: min(len(dataset), 12)]:
            batch = collate_batch([item], pad_token_id, device)
            loss = model(**batch).loss
            losses.append(float(loss.item()))
    model.train()
    return sum(losses) / max(len(losses), 1)


def get_dtype(device: str):
    if torch is None:
        return None
    if device == "cuda":
        if hasattr(torch.cuda, "is_bf16_supported") and torch.cuda.is_bf16_supported():
            return torch.bfloat16
        return torch.float16
    return torch.float32


def select_target_modules(model: Any) -> list[str]:
    desired = ["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj", "c_proj", "c_attn", "fc1", "fc2"]
    module_names = {name.split(".")[-1] for name, module in model.named_modules() if hasattr(module, "weight")}
    selected = [name for name in desired if name in module_names]
    return selected or ["q_proj", "v_proj"]


def list_adapters() -> list[str]:
    adapters = [item for item in ADAPTER_DIR.glob("*") if item.is_dir() and (item / "adapter_config.json").exists()]
    return [str(item) for item in sorted(adapters, key=lambda path: path.stat().st_mtime, reverse=True)[:20]]


def load_tokenizer(model_path: str):
    tokenizer = AutoTokenizer.from_pretrained(model_path, local_files_only=True, trust_remote_code=True)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    return tokenizer


def load_base_model(model_path: str, device: str, method: str = "lora"):
    dtype = get_dtype(device)
    if method == "qlora":
        if BitsAndBytesConfig is None or not has_package("bitsandbytes"):
            raise RuntimeError("QLoRA requires bitsandbytes, but it is not available in this Python environment.")
        quant_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_compute_dtype=dtype if dtype in {torch.float16, torch.bfloat16} else torch.float16,
            bnb_4bit_use_double_quant=True,
            bnb_4bit_quant_type="nf4",
        )
        return AutoModelForCausalLM.from_pretrained(
            model_path,
            quantization_config=quant_config,
            device_map="auto",
            local_files_only=True,
            trust_remote_code=True,
        )
    model = AutoModelForCausalLM.from_pretrained(
        model_path,
        torch_dtype=dtype,
        local_files_only=True,
        trust_remote_code=True,
    )
    return model.to(device)


def cleanup_cuda() -> None:
    gc.collect()
    if torch is not None and torch.cuda.is_available():
        torch.cuda.empty_cache()


def run_training(job_id: str, request: dict[str, Any]) -> None:
    global ACTIVE_JOB_ID
    if torch is None or AutoModelForCausalLM is None or AutoTokenizer is None or LoraConfig is None or get_peft_model is None:
        update_job(job_id, status="failed", error="Required packages are missing. Install torch, transformers, peft, and accelerate.", finished_at=utc_now())
        return

    stop_event = STOP_EVENTS[job_id]
    try:
        model_path = str(request.get("model_path") or "").strip()
        inspect_model_path(model_path)
        method = str(request.get("method") or "lora").lower()
        method = "qlora" if method == "qlora" else "lora"
        device, device_reason = resolve_device(request.get("device"))
        seed = clamp_int(request.get("seed"), 42, 1, 999_999)
        random.seed(seed)
        torch.manual_seed(seed)
        if is_cuda_available():
            torch.cuda.manual_seed_all(seed)

        samples = parse_sft_jsonl(str(request.get("dataset_jsonl") or ""))
        job_dataset_path = DATASET_DIR / f"{job_id}.jsonl"
        job_dataset_path.write_text("\n".join(json.dumps({"messages": [
            {"role": "system", "content": sample.system},
            {"role": "user", "content": sample.user},
            {"role": "assistant", "content": sample.assistant},
        ]}, ensure_ascii=False) for sample in samples) + "\n", encoding="utf-8")

        max_seq_len = clamp_int(request.get("max_seq_len"), 1024, 128, 8192)
        batch_size = clamp_int(request.get("batch_size"), 1, 1, 16)
        grad_accum = clamp_int(request.get("gradient_accumulation_steps"), 8, 1, 128)
        max_steps = clamp_int(request.get("max_steps"), 120, 1, 200_000)
        target_epochs = clamp_float(request.get("target_epochs"), 2.0, 0.1, 100.0)
        learning_rate = clamp_float(request.get("learning_rate"), 2e-4, 1e-6, 1e-2)
        lora_r = clamp_int(request.get("lora_r"), 16, 1, 256)
        lora_alpha = clamp_int(request.get("lora_alpha"), 32, 1, 512)
        lora_dropout = clamp_float(request.get("lora_dropout"), 0.05, 0.0, 0.8)

        update_job(job_id, status="running", started_at=utc_now(), model_path=model_path, device=device, dataset_size=len(samples))
        append_log(job_id, f"device = {device}")
        append_log(job_id, device_reason)
        append_log(job_id, f"method = {method}")
        append_log(job_id, f"model_path = {model_path}")
        append_log(job_id, f"dataset_size = {len(samples)}")

        tokenizer = load_tokenizer(model_path)
        encoded = [encode_sample(tokenizer, sample, max_seq_len) for sample in samples]
        random.shuffle(encoded)
        val_size = max(1, int(len(encoded) * 0.12)) if len(encoded) > 3 else 1
        val_data = encoded[:val_size]
        train_data = encoded[val_size:] or encoded

        model = load_base_model(model_path, device, method)
        if getattr(model.config, "use_cache", None) is not None:
            model.config.use_cache = False
        if hasattr(model, "gradient_checkpointing_enable"):
            model.gradient_checkpointing_enable()
        if method == "qlora" and prepare_model_for_kbit_training is not None:
            model = prepare_model_for_kbit_training(model)

        target_modules = select_target_modules(model)
        lora_config = LoraConfig(
            r=lora_r,
            lora_alpha=lora_alpha,
            lora_dropout=lora_dropout,
            bias="none",
            task_type="CAUSAL_LM",
            target_modules=target_modules,
        )
        model = get_peft_model(model, lora_config)
        trainable = sum(parameter.numel() for parameter in model.parameters() if parameter.requires_grad)
        total = sum(parameter.numel() for parameter in model.parameters())
        optimizer = torch.optim.AdamW((parameter for parameter in model.parameters() if parameter.requires_grad), lr=learning_rate)

        adapter_path = ADAPTER_DIR / job_id
        update_job(
            job_id,
            train_size=len(train_data),
            val_size=len(val_data),
            max_steps=max_steps,
            target_epochs=target_epochs,
            adapter_path=str(adapter_path),
            trainable_parameters=trainable,
            total_parameters=total,
        )
        update_progress(job_id, 0, max_steps, target_epochs)
        append_log(job_id, f"target_modules = {target_modules}")
        append_log(job_id, f"trainable_parameters = {trainable:,} / {total:,}")
        append_log(job_id, f"batch_size = {batch_size}, grad_accum = {grad_accum}, max_steps = {max_steps}")

        model.train()
        log_every = max(1, max_steps // 12)
        start_time = time.time()
        pad_token_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else tokenizer.eos_token_id
        for step in range(1, max_steps + 1):
            if stop_event.is_set():
                update_job(job_id, status="stopped", finished_at=utc_now())
                append_log(job_id, "training stopped by user")
                return
            optimizer.zero_grad(set_to_none=True)
            accumulated_loss = 0.0
            for _ in range(grad_accum):
                batch_items = [random.choice(train_data) for _ in range(batch_size)]
                batch = collate_batch(batch_items, pad_token_id, device)
                loss = model(**batch).loss / grad_accum
                loss.backward()
                accumulated_loss += float(loss.item())
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            update_progress(job_id, step, max_steps, target_epochs)

            if step == 1 or step % log_every == 0 or step == max_steps:
                elapsed = max(time.time() - start_time, 1e-6)
                tokens = step * grad_accum * batch_size * max_seq_len
                val_loss = evaluate_loss(model, val_data, pad_token_id, batch_size, device)
                metric = {
                    "step": step,
                    "train_loss": round(accumulated_loss, 6),
                    "val_loss": round(float(val_loss), 6),
                    "tokens_per_second": int(tokens / elapsed),
                    "epoch": round((step / max_steps) * target_epochs, 4),
                }
                append_metric(job_id, metric)
                append_log(job_id, f"step {step:5d}/{max_steps} | epoch {metric['epoch']:.2f}/{target_epochs:g} | train_loss {metric['train_loss']:.4f} | val_loss {metric['val_loss']:.4f} | {metric['tokens_per_second']:,} tok/s")

        adapter_path.mkdir(parents=True, exist_ok=True)
        model.save_pretrained(adapter_path)
        tokenizer.save_pretrained(adapter_path)
        (adapter_path / "training_metadata.json").write_text(
            json.dumps(
                {
                    "model_path": model_path,
                    "method": method,
                    "target_modules": target_modules,
                    "dataset_path": str(job_dataset_path),
                    "dataset_size": len(samples),
                    "max_steps": max_steps,
                    "target_epochs": target_epochs,
                    "created_at": utc_now(),
                },
                ensure_ascii=False,
                indent=2,
            ),
            encoding="utf-8",
        )
        update_job(job_id, status="completed", adapter_path=str(adapter_path), finished_at=utc_now())
        append_log(job_id, f"adapter saved: {adapter_path}")
    except Exception as error:
        update_job(job_id, status="failed", error=str(error), finished_at=utc_now())
        append_log(job_id, traceback.format_exc())
    finally:
        cleanup_cuda()
        with LOCK:
            if ACTIVE_JOB_ID == job_id:
                ACTIVE_JOB_ID = None


def generate_text(model_path: str, adapter_path: str | None, system: str, prompt: str, max_new_tokens: int, temperature: float) -> str:
    if torch is None or AutoModelForCausalLM is None or AutoTokenizer is None:
        raise RuntimeError("Required packages are missing.")
    device, _reason = resolve_device()
    tokenizer = load_tokenizer(model_path)
    sample = SFTSample(system=system, user=prompt, assistant="")
    rendered = render_prompt(tokenizer, sample)
    model = load_base_model(model_path, device, "lora")
    if adapter_path:
        if PeftModel is None:
            raise RuntimeError("PEFT is not available, cannot load adapter.")
        model = PeftModel.from_pretrained(model, adapter_path)
    model.eval()
    inputs = tokenizer(rendered, return_tensors="pt").to(device)
    do_sample = temperature > 0.05
    with torch.no_grad():
        output_ids = model.generate(
            **inputs,
            max_new_tokens=max(16, min(int(max_new_tokens), 1600)),
            temperature=max(temperature, 0.05),
            do_sample=do_sample,
            top_p=0.9,
            pad_token_id=tokenizer.pad_token_id or tokenizer.eos_token_id,
            eos_token_id=tokenizer.eos_token_id,
        )
    generated = output_ids[0, inputs["input_ids"].shape[-1] :]
    text = tokenizer.decode(generated, skip_special_tokens=True).strip()
    del model
    cleanup_cuda()
    return text


def generate_response(request: dict[str, Any]) -> dict[str, Any]:
    model_path = str(request.get("model_path") or "").strip()
    adapter_path = str(request.get("adapter_path") or "").strip() or None
    system = str(request.get("system") or "")
    prompt = str(request.get("prompt") or "").strip()
    if not prompt:
        raise ValueError("prompt is required")
    max_new_tokens = clamp_int(request.get("max_new_tokens"), 320, 16, 1600)
    temperature = clamp_float(request.get("temperature"), 0.3, 0.0, 1.5)
    compare_base = bool(request.get("compare_base"))
    result: dict[str, Any] = {"prompt": prompt, "model_path": model_path, "adapter_path": adapter_path}
    if compare_base:
        result["base_text"] = generate_text(model_path, None, system, prompt, max_new_tokens, temperature)
    result["lora_text" if adapter_path else "text"] = generate_text(model_path, adapter_path, system, prompt, max_new_tokens, temperature)
    return result


def evaluate_response(request: dict[str, Any]) -> dict[str, Any]:
    prompts = request.get("prompts")
    if not isinstance(prompts, list) or not prompts:
        prompts = [
            "为什么 LoRA 微调适合做 AlgoLab 课程助教?",
            "解释 assistant-only loss。",
            "训练时 GPU 显存不足怎么办?",
        ]
    items = []
    for prompt in prompts[:8]:
        output = generate_response({**request, "prompt": str(prompt), "compare_base": False}).get("lora_text") or ""
        has_terms = any(term.lower() in output.lower() for term in DOMAIN_KEYWORDS)
        items.append({
            "prompt": str(prompt),
            "output": output,
            "contains_domain_terms": has_terms,
            "length": len(output),
        })
    total = len(items) or 1
    return {
        "items": items,
        "summary": {
            "count": len(items),
            "average_length": sum(item["length"] for item in items) / total,
            "format_score": sum(1 for item in items if "\n" in item["output"] or "。" in item["output"]) / total,
            "domain_keyword_rate": sum(1 for item in items if item["contains_domain_terms"]) / total,
        },
    }


def status_payload() -> dict[str, Any]:
    device, reason = resolve_device()
    return {
        "ok": True,
        "python": sys.version.split()[0],
        "torch_available": torch is not None,
        "torch_version": getattr(torch, "__version__", None) if torch is not None else None,
        "torch_cuda_version": torch_cuda_version(),
        "cuda_available": is_cuda_available(),
        "cuda_device": torch.cuda.get_device_name(0) if is_cuda_available() else None,
        "cuda_device_count": torch.cuda.device_count() if torch is not None else 0,
        "transformers_available": AutoModelForCausalLM is not None,
        "transformers_version": package_version("transformers"),
        "peft_available": LoraConfig is not None,
        "peft_version": package_version("peft"),
        "accelerate_available": has_package("accelerate"),
        "bitsandbytes_available": has_package("bitsandbytes"),
        "device_policy": DEVICE_POLICY,
        "device": device,
        "device_reason": reason,
        "working_dir": str(RUN_DIR),
        "active_job_id": ACTIVE_JOB_ID,
        "adapters": list_adapters(),
    }


class LoRADomainAssistantHandler(BaseHTTPRequestHandler):
    def log_message(self, format: str, *args: Any) -> None:
        print(f"[lora-domain-runner] {self.address_string()} - {format % args}")

    def do_OPTIONS(self) -> None:
        respond(self, 200, {"ok": True})

    def do_GET(self) -> None:
        path = (urlparse(self.path).path.rstrip("/") or "/")
        if path in {"/", "/status", "/health"}:
            respond(self, 200, status_payload())
            return
        if path.startswith("/jobs/"):
            job_id = path.split("/")[-1]
            with LOCK:
                job = JOBS.get(job_id)
            if not job:
                respond(self, 404, {"message": "Job not found."})
                return
            respond(self, 200, job)
            return
        if path == "/adapters":
            respond(self, 200, {"adapters": list_adapters()})
            return
        respond(self, 404, {"message": "Unknown route."})

    def do_POST(self) -> None:
        global ACTIVE_JOB_ID
        path = (urlparse(self.path).path.rstrip("/") or "/")
        try:
            if path == "/models/inspect":
                body = read_body(self)
                respond(self, 200, inspect_model_path(str(body.get("model_path") or "")))
                return
            if path == "/train":
                body = read_body(self)
                with LOCK:
                    if ACTIVE_JOB_ID and JOBS.get(ACTIVE_JOB_ID, {}).get("status") in {"queued", "running"}:
                        respond(self, 409, {"message": f"Training job already running: {ACTIVE_JOB_ID}"})
                        return
                    job_id = uuid.uuid4().hex[:12]
                    job = {"id": job_id, "status": "queued", "created_at": utc_now(), "logs": [], "metrics": []}
                    JOBS[job_id] = job
                    STOP_EVENTS[job_id] = threading.Event()
                    ACTIVE_JOB_ID = job_id
                thread = threading.Thread(target=run_training, args=(job_id, body), daemon=True)
                thread.start()
                respond(self, 200, job)
                return
            if path.startswith("/jobs/") and path.endswith("/stop"):
                job_id = path.split("/")[2]
                if job_id in STOP_EVENTS:
                    STOP_EVENTS[job_id].set()
                with LOCK:
                    job = JOBS.get(job_id)
                respond(self, 200, job or {"message": "Job not found."})
                return
            if path == "/generate":
                respond(self, 200, generate_response(read_body(self)))
                return
            if path == "/evaluate":
                respond(self, 200, evaluate_response(read_body(self)))
                return
            respond(self, 404, {"message": "Unknown route."})
        except Exception as error:
            respond(self, 500, {"message": str(error), "traceback": traceback.format_exc()})


def main() -> None:
    global DEVICE_POLICY
    parser = argparse.ArgumentParser(description="Local LoRA domain assistant runner for AlgoLab.")
    parser.add_argument("--host", default="127.0.0.1")
    parser.add_argument("--port", type=int, default=4890)
    parser.add_argument("--device", choices=["auto", "cuda", "gpu", "cpu"], default="auto")
    args = parser.parse_args()
    DEVICE_POLICY = "auto" if args.device in {"auto", "cuda", "gpu"} else "cpu"
    server = ThreadingHTTPServer((args.host, args.port), LoRADomainAssistantHandler)
    print(f"LoRA domain assistant runner listening on http://{args.host}:{args.port}")
    print(f"Working directory: {RUN_DIR}")
    print(f"Device policy: {DEVICE_POLICY}")
    print(f"Resolved device: {resolve_device()[0]} ({resolve_device()[1]})")
    print("Press Ctrl+C to stop.")
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        print("\nStopping LoRA domain assistant runner.")
    finally:
        server.server_close()


if __name__ == "__main__":
    main()
训练

本地训练 LoRA 领域助手

这里不下载模型,也不在浏览器里训练。前端只通过 HTTP 控制本机 Python runner,base model 必须来自用户本地目录。

# Windows / PowerShell
& 'C:\Users\richi\TI_richiebao\LLM\.venv\Scripts\python.exe' scripts\lora_domain_assistant_local_runner.py --host 127.0.0.1 --port 4890 --device auto

# Runner API
GET  /status
POST /models/inspect
POST /train
POST /generate
POST /evaluate
Runner 状态
Connected
no
Device
-
PyTorch
-
CUDA
no
Transformers
no
PEFT
no
bitsandbytes
no
Adapters
-
1. 选择用户本地 base model
2. 构建 AlgoLab 领域 SFT 数据集

内置候选池为 50,000 条多模板合成领域样本;训练时按样本量、seed 和领域比例随机抽取 JSONL 子集,只对 assistant 回答计算 loss。正式训练前仍建议抽查并混入教师模型或人工修订样本。

Candidate Pool
50,000
Ratio Total
100
Valid Lines
3,000
Invalid Lines
0
Chars
1,011,420
Mode
assistant loss
first sample
{
  "lineNumber": 1,
  "system": "你是 AlgoLab 机器学习课程助教。回答要准确、分步骤、优先解释直觉,再给必要公式或本地训练建议。不要编造实验结果。",
  "user": "本地训练 Qwen2.5-Coder-1.5B-Instruct 时遇到“训练 loss 下降但验证 loss 上升”,请给排查顺序。",
  "assistant": "这是典型过拟合信号。先检查 train/val 是否来自不同问题,再降低 epoch 或 LoRA rank,并增加真实失败样例。 建议同时记录 model=Qwen2.5-Coder-1.5B-Instruct、rank=32、样本量=3000 和 seed,方便下一轮复现实验。"
}
3. 本地 LoRA / QLoRA 微调

冻结 base model,只训练 adapter。QLoRA 需要本地 bitsandbytes 可用。

Dataset
-
Train
-
Val
-
Adapter
-
Train Loss
-
Val Loss
-
Tok/s
-
Max Steps
-
等待训练日志...
4. Base vs LoRA 领域助手对比

默认使用最新 adapter,也可以手工选择其它 adapter 目录。

Selected Adapter
-
显存

LoRA 与 QLoRA 怎么选

第一版默认普通 LoRA,因为它最稳定,也最容易调试。如果显存不足,再切换 QLoRA。QLoRA 会用 4-bit 方式加载 frozen base model,只训练 LoRA adapter,但它依赖 bitsandbytes,在 Windows 环境里要特别确认安装是否可用。

方法优点代价建议
LoRA稳定、调试简单、兼容性好基座模型仍需半精度加载1.5B 第一版优先使用
QLoRA显存占用更低依赖 bitsandbytes,速度和兼容性更敏感显存紧张或 3B 模型时尝试
本地调参起点: rank = 16 alpha = 32 dropout = 0.05 learning_rate = 2e-4 batch_size = 1 grad_accum = 8 max_seq_len = 1024
评测

评测:从 loss 走向领域可用性

LoRA SFT 的 loss 是必要信号,但不是最终目标。领域助手是否有用,要看它在真实问题上是否更像一个懂课程、懂本地训练、懂约束的助教。

指标看什么为什么重要
train / val loss是否稳定下降,是否过拟合判断训练是否学到分布,而不是只记训练集。
base vs LoRA 对比微调后是否更懂 AlgoLab 术语和流程确认 adapter 真的改变了行为。
领域关键词覆盖回答是否提到 LoRA、loss、GPU、checkpoint 等关键概念粗略观察领域对齐程度。
人工可用性回答是否准确、分步骤、可操作最终仍要用真实用户问题验证。
低 loss -> 形式更像训练分布 有用助手 -> 领域准确 + 操作建议 + 不编造 + 能承认不确定
数据

数据规模:5万候选池与训练抽样

模块 9 的交互卡片内置一个 50,000 条 AlgoLab 领域 SFT 候选池。它是多模板合成候选池,不等于 50,000 条人工审核或教师模型精修数据。候选池不一次性全部送进训练;实际训练时由用户指定样本量、随机 seed 和领域比例,再抽取 JSONL 子集。这样既能演示生成级数据规模,又能控制本地训练成本。

阶段建议抽样量目标
Smoke test10 到 50确认本地模型、CUDA、runner、adapter 保存和推理都能工作。
课程风格对齐500 到 1000让回答更像 AlgoLab 助教,覆盖主要模块。
生成级可用3000 到 10000覆盖概念解释、训练诊断、runner 使用、评测和实验流程,并混入人工或教师模型修订样本。
大规模续训10000 到 50000用于更长训练或多轮抽样,但要持续做验证集、去重和人工抽查。
默认候选池 = 50,000 条多模板合成样本 训练集 = 按比例随机抽样后的 JSONL 子集 推荐起点:3000 条,seed=42,比例 30/25/20/15/10 正式训练:抽查 + 去重 + 教师/人工修订

如果目标是真正产品级助手,不能只靠模板合成数据。更稳的路径是:合成数据跑通分布,教师模型生成高质量问答,人工审阅关键样本,再把课程讲义和最新资料交给 RAG。

数据

扩充数据的教师 Prompt 模板

text
You are creating supervised fine-tuning JSONL for an AlgoLab machine learning course assistant.
Return only valid JSONL. Every line must be one object with a messages array.

System message:
你是 AlgoLab 机器学习课程助教。回答要准确、分步骤、优先解释直觉,再给必要公式或本地训练建议。不要编造实验结果。

Create {sample_count} samples for topic = {topic}.
Topics can include:
- next-token and decoder-only training
- Transformer self-attention
- TinyGPT from scratch
- TinyCodeGPT task-to-code training
- Tool-Calling TinyGPT
- LoRA / QLoRA fine-tuning
- GPU local runner diagnostics
- loss curves and evaluation
- checkpoint, adapter, train/val split

Rules:
1. User questions must be concrete and varied.
2. Assistant answers must be correct, concise, step-by-step, and actionable.
3. Include at least one failure diagnosis or tradeoff in many samples.
4. Do not invent benchmark numbers, hardware results, or file paths.
5. Keep each assistant answer under 180 Chinese characters unless a formula is necessary.
6. Output raw JSONL only, no markdown.

Output shape:
{"messages":[{"role":"system","content":"..."},{"role":"user","content":"..."},{"role":"assistant","content":"..."}]}
方向

下一步:LoRA 学行为,RAG 管知识

模块 9 的重点是把本地 LoRA 训练闭环做实。等这个助手能稳定回答课程内问题后,下一阶段不应该继续把所有材料硬背进 adapter,而是把它接到检索系统上。

LoRA adapter 适合学习: - 回答风格 - 领域术语 - 诊断流程 - 输出约束 RAG 适合提供: - 课程讲义 - 用户项目文档 - 最新说明 - 长上下文资料

这也是领域助手真正变得有用的分工:模型负责理解和组织,检索负责事实和上下文。

AI
问问 LLM:如何把本地模型微调成领域助手