算法可视化与交互学习平台
LoRA 领域助手:微调一个本地 1.5B~3B 小模型LoRA Domain Assistant: Fine-Tune a Local 1.5B-3B Model
第三阶段从 TinyGPT 走向真正可用的本地领域助手:选择用户本地已下载的 1.5B~3B instruct 模型,用 LoRA 或 QLoRA 只训练少量 adapter 参数,通过本地 HTTP runner 调用 GPU 优先训练、保存 checkpoint、加载当前 adapter 推理,并用领域问答评测从 loss 走向可用性。
第三阶段:从小模型实验走向领域助手
前 8 个模块已经走完三条线索:
模块 9 进入第三阶段:不再从零训练一个玩具模型,而是在用户本机已经下载好的开源 instruct 模型上做 **LoRA / QLoRA 微调**,目标是得到一个真正有用的领域助手。
本模块默认领域是 **AlgoLab 机器学习课程助教**:它要理解前面模块里的 TinyGPT、Transformer、VAE、Diffusion、Tool Calling、GPU 本地训练、checkpoint、loss 和评测问题。这个领域足够窄,可以本地训练;又足够真实,可以回答用户实际会问的问题。
完整闭环仍然延续 No.7 和 No.8 的工程形状:
领域助手先学哪些领域
第一版不要追求通用聊天。LoRA 的优势是把一个已有 instruct 模型对齐到稳定、窄而高频的任务分布。对 AlgoLab 来说,最合适的领域不是百科知识,而是课程内的解释、诊断和训练建议。
| 领域 | 模型要学什么 | 典型用户问题 |
|---|---|---|
| 机器学习课程解释 | 把 Transformer、VAE、Diffusion、LoRA、Tool Calling 讲成可学习的步骤 | 为什么 next-token 能产生工具调用能力? |
| 本地训练诊断 | 解释 loss、过拟合、batch、grad accumulation、GPU fallback | CUDA 可用但显存不足应该怎么调? |
| 实验流程助手 | 指导用户准备 JSONL、选择 checkpoint、比较 base 和 adapter | 为什么要做 train/val split? |
| 代码与 runner 使用 | 解释本地 HTTP runner、Python 环境、模型路径和 adapter 目录 | runner 已连接但模型加载失败怎么办? |
| 评测与可用性 | 区分低 loss、格式正确、答案有用、事实可靠 | 为什么低 loss 不等于领域助手好用? |
模型选择:必须来自用户本地已下载模型
本模块不在 runner 中自动下载模型。训练时只读取用户本地硬盘上的 Hugging Face 格式模型目录,例如包含 config.json、tokenizer 文件和 *.safetensors 权重文件的目录。
| 建议模型 | 适用场景 | 第一版建议 |
|---|---|---|
Qwen2.5-1.5B-Instruct | 中文课程助教、通用解释、训练诊断 | 优先。轻量、稳定,适合先打通 LoRA 闭环。 |
Qwen3-1.7B | 后续想扩展 agent、推理和工具使用 | 可作为第二选择。注意 thinking 模式会让提示模板更复杂。 |
Qwen2.5-Coder-1.5B-Instruct | 代码解释、runner 调试、数据处理脚本 | 如果领域更偏代码,优先它。 |
Llama-3.2-3B-Instruct | 英文资料、3B 级别对话质量 | 显存压力更高,许可证也需要单独确认。 |
实际工程上,最保守的顺序是:先用 1.5B 模型和 20 到 200 条样本验证流程,再增加数据量、epoch 和模型尺寸。
LoRA 的核心:冻结大模型,只训练低秩增量
LoRA 的核心不是重新训练整个大模型,而是在某些线性层旁边加一个很小的低秩“补丁”。原始权重 冻结不动,只训练 和 ,让模型在少量可训练参数里学到领域方向。
为什么不是全量微调
全量微调会更新大模型中的大量权重,还要保存梯度、优化器状态和训练激活。1.5B 到 3B 模型即使权重本身能加载,训练内存也会明显放大。LoRA 的思路是:预训练模型已经有通用语言能力,领域微调只需要给它一个小的方向修正。
从一个线性层看参数量
假设某个 Transformer 线性层是 。如果直接学习完整的 ,需要约 1677 万个参数;如果 LoRA rank 取 ,只需要两个细矩阵,参数量约 6.5 万,单层约少 256 倍。
为什么叫低秩
低秩假设是:领域适配并不需要在完整高维空间里任意修改模型。对于 AlgoLab 助教,真正需要改变的是回答风格、课程术语、诊断流程和少量任务偏好。这些变化可以被限制在较少的方向上,rank 就是在控制这些可学习方向的数量。
训练和推理时发生什么
训练时,base model 的 不更新,只有 LoRA adapter 的 更新;训练结束保存的是 adapter 文件。推理时,需要同时加载 base model 和 adapter,模型前向计算等价于使用 。
LoRA 能做什么,不能做什么
LoRA 很适合学习稳定行为:回答格式、领域术语、诊断步骤、工具使用习惯。它不适合把大量易变事实硬背进权重。生产级领域助手通常让 LoRA 学行为,让 RAG 或工具提供最新资料、日志和外部事实。
训练目标:assistant-only next-token loss
微调仍然是 next-token 预测,但只在 assistant 回答 token 上计算 loss。system 和 user 是条件上下文,不要求模型复述它们。
和模块 8 的关系
Tool-Calling TinyGPT 训练 CALL 文本;这里训练领域回答文本。两者本质都是在条件上下文后最大化目标 token 序列概率。
JSONL 数据格式:messages 对话样本
jsonl{"messages":[{"role":"system","content":"你是 AlgoLab 机器学习课程助教。回答要准确、分步骤、优先解释直觉,再给必要公式或本地训练建议。不要编造实验结果。"},{"role":"user","content":"为什么 LoRA 微调适合做 AlgoLab 课程助教?"},{"role":"assistant","content":"LoRA 适合把已有 instruct 模型对齐到稳定的课程领域。它冻结基座模型,只训练少量 adapter 参数,因此本地显存压力较小,迭代速度快。对于 AlgoLab,LoRA 可以学习课程术语、回答结构、训练诊断流程和评测口径。"}]}
{"messages":[{"role":"system","content":"你是 AlgoLab 机器学习课程助教。"},{"role":"user","content":"训练 loss 下降但验证 loss 上升,应该怎么判断?"},{"role":"assistant","content":"这通常是过拟合信号。先确认 train/val split 是否合理,再看数据量、训练 epoch、学习率和 LoRA rank。可以减少 epoch、降低 rank、增加数据多样性,或加入更严格的验证集。"}]}一条样本如何进入 LoRA 训练
sample = json.loads(line)
messages = sample['messages']本地 HTTP runner 架构
和 No.7、No.8 一样,浏览器页面不直接训练模型,而是通过 HTTP 调用本机 Python runner。这样前端只负责控制台和可视化,真正的 PyTorch、Transformers、PEFT、CUDA 都运行在用户自己的 Python 环境中。
| 接口 | 用途 |
|---|---|
GET /status | 检查 PyTorch、CUDA、Transformers、PEFT、bitsandbytes 和 adapter 列表。 |
POST /models/inspect | 验证本地模型目录是否包含必要文件。 |
POST /train | 启动 LoRA 或 QLoRA 训练任务。 |
GET /jobs/{id} | 轮询 step、epoch、loss、percent 和日志。 |
POST /generate | 加载当前 adapter 或手动选择 adapter 后生成回答。 |
POST /evaluate | 用一组领域 prompt 做基础可用性评测。 |
安装依赖并启动本地 LoRA runner
powershell# 1. 先在用户本地 Python 环境安装 LoRA 训练依赖
& 'C:\Users\richi\TI_richiebao\LLM\.venv\Scripts\python.exe' -m pip install -U transformers peft accelerate safetensors sentencepiece
# 2. 如果要尝试 QLoRA,再安装 bitsandbytes。Windows 环境需确认 wheel 是否支持当前 CUDA/Python。
& 'C:\Users\richi\TI_richiebao\LLM\.venv\Scripts\python.exe' -m pip install -U bitsandbytes
# 3. 在 AlgoLab 项目根目录启动 runner
& 'C:\Users\richi\TI_richiebao\LLM\.venv\Scripts\python.exe' scripts\lora_domain_assistant_local_runner.py --host 127.0.0.1 --port 4890 --device auto
# 如果用户环境里 python 已经指向正确 venv,也可以执行
python scripts\lora_domain_assistant_local_runner.py --host 127.0.0.1 --port 4890 --device auto
# GPU 策略
# --device auto : 优先 CUDA,没有再回 CPU
# --device cuda : 要求 CUDA,适合确认 GPU 环境
# --device cpu : 明确只用 CPU本地 runner 源码:lora_domain_assistant_local_runner.py
pythonfrom __future__ import annotations
import argparse
import gc
import importlib
import json
import math
import random
import sys
import threading
import time
import traceback
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
try:
import torch
except Exception: # pragma: no cover - status endpoint reports this clearly.
torch = None
try:
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
except Exception: # pragma: no cover
AutoModelForCausalLM = None
AutoTokenizer = None
BitsAndBytesConfig = None
try:
from peft import LoraConfig, PeftModel, get_peft_model, prepare_model_for_kbit_training
except Exception: # pragma: no cover
LoraConfig = None
PeftModel = None
get_peft_model = None
prepare_model_for_kbit_training = None
ROOT_DIR = Path(__file__).resolve().parents[1]
RUN_DIR = ROOT_DIR / ".tmp" / "lora_domain_assistant"
ADAPTER_DIR = RUN_DIR / "adapters"
DATASET_DIR = RUN_DIR / "datasets"
RUN_DIR.mkdir(parents=True, exist_ok=True)
ADAPTER_DIR.mkdir(parents=True, exist_ok=True)
DATASET_DIR.mkdir(parents=True, exist_ok=True)
JOBS: dict[str, dict[str, Any]] = {}
STOP_EVENTS: dict[str, threading.Event] = {}
ACTIVE_JOB_ID: str | None = None
LOCK = threading.Lock()
DEVICE_POLICY = "auto"
DOMAIN_KEYWORDS = [
"LoRA",
"adapter",
"loss",
"TinyGPT",
"VAE",
"diffusion",
"Tool-Calling",
"checkpoint",
"GPU",
"assistant-only",
]
@dataclass
class SFTSample:
system: str
user: str
assistant: str
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
def package_version(name: str) -> str | None:
try:
module = importlib.import_module(name)
except Exception:
return None
return str(getattr(module, "__version__", "installed"))
def has_package(name: str) -> bool:
return package_version(name) is not None
def torch_cuda_version() -> str | None:
if torch is None:
return None
return getattr(getattr(torch, "version", None), "cuda", None)
def is_cuda_available() -> bool:
return bool(torch is not None and torch.cuda.is_available())
def cuda_unavailable_reason() -> str:
if torch is None:
return "PyTorch is not installed."
version = str(getattr(torch, "__version__", ""))
if "+cpu" in version:
return "The current PyTorch wheel is CPU-only."
if torch_cuda_version() is None:
return "The current PyTorch build does not report CUDA support."
return "CUDA is not available to PyTorch in this environment."
def resolve_device(requested_policy: str | None = None) -> tuple[str, str]:
policy = str(requested_policy or DEVICE_POLICY or "auto").strip().lower()
if policy in {"gpu", "cuda"}:
policy = "auto"
if policy == "cpu":
return "cpu", "CPU was selected explicitly."
if is_cuda_available():
name = torch.cuda.get_device_name(0)
return "cuda", f"GPU priority: using CUDA device {name}."
return "cpu", f"GPU priority: CUDA unavailable, falling back to CPU. {cuda_unavailable_reason()}"
def respond(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any]) -> None:
raw = json.dumps(payload, ensure_ascii=False, default=str).encode("utf-8")
handler.send_response(status)
handler.send_header("Content-Type", "application/json; charset=utf-8")
handler.send_header("Content-Length", str(len(raw)))
handler.send_header("Access-Control-Allow-Origin", "*")
handler.send_header("Access-Control-Allow-Headers", "Content-Type")
handler.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
handler.end_headers()
handler.wfile.write(raw)
def read_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]:
length = int(handler.headers.get("Content-Length") or "0")
if length <= 0:
return {}
raw = handler.rfile.read(length).decode("utf-8")
return json.loads(raw or "{}")
def update_job(job_id: str, **patch: Any) -> None:
with LOCK:
JOBS[job_id].update(patch)
def append_log(job_id: str, line: str) -> None:
with LOCK:
logs = JOBS[job_id].setdefault("logs", [])
logs.append(line)
if len(logs) > 240:
del logs[: len(logs) - 240]
def append_metric(job_id: str, metric: dict[str, Any]) -> None:
with LOCK:
metrics = JOBS[job_id].setdefault("metrics", [])
metrics.append(metric)
if len(metrics) > 240:
del metrics[: len(metrics) - 240]
def update_progress(job_id: str, step: int, max_steps: int, target_epochs: float) -> None:
epoch = (step / max(max_steps, 1)) * max(target_epochs, 0.0)
update_job(
job_id,
progress={
"step": step,
"max_steps": max_steps,
"epoch": round(epoch, 4),
"target_epochs": target_epochs,
"percent": round(step / max(max_steps, 1), 6),
},
)
def clamp_int(value: Any, fallback: int, lower: int, upper: int) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
parsed = fallback
return max(lower, min(parsed, upper))
def clamp_float(value: Any, fallback: float, lower: float, upper: float) -> float:
try:
parsed = float(value)
except (TypeError, ValueError):
parsed = fallback
return max(lower, min(parsed, upper))
def inspect_model_path(model_path: str) -> dict[str, Any]:
path = Path(model_path).expanduser()
if not path.exists() or not path.is_dir():
raise ValueError("Model path does not exist or is not a directory.")
files = {item.name for item in path.iterdir() if item.is_file()}
has_config = "config.json" in files
has_tokenizer = any(name in files for name in ["tokenizer.json", "tokenizer.model", "vocab.json"])
has_weights = any(name.endswith((".safetensors", ".bin", ".pt")) for name in files) or any(path.glob("*.safetensors"))
config_payload = {}
if has_config:
try:
config_payload = json.loads((path / "config.json").read_text(encoding="utf-8"))
except Exception:
config_payload = {}
if not has_config:
raise ValueError("config.json not found in model directory.")
if not has_tokenizer:
raise ValueError("Tokenizer files not found in model directory.")
if not has_weights:
raise ValueError("Model weight files not found in model directory.")
return {
"ok": True,
"path": str(path),
"model_type": config_payload.get("model_type"),
"architectures": config_payload.get("architectures"),
"vocab_size": config_payload.get("vocab_size"),
"hidden_size": config_payload.get("hidden_size"),
"num_hidden_layers": config_payload.get("num_hidden_layers"),
}
def normalize_sft_sample(value: Any, line_number: int) -> SFTSample:
if not isinstance(value, dict):
raise ValueError(f"line {line_number}: expected JSON object")
messages = value.get("messages")
if not isinstance(messages, list):
raise ValueError(f"line {line_number}: messages must be a list")
system = ""
user = ""
assistant = ""
for item in messages:
if not isinstance(item, dict):
continue
role = str(item.get("role") or "")
content = str(item.get("content") or "").strip()
if role == "system" and content and not system:
system = content
elif role == "user" and content:
user = content
elif role == "assistant" and content:
assistant = content
if not user or not assistant:
raise ValueError(f"line {line_number}: each sample needs user and assistant messages")
return SFTSample(system=system, user=user, assistant=assistant)
def parse_sft_jsonl(raw: str) -> list[SFTSample]:
samples: list[SFTSample] = []
for index, line in enumerate(raw.strip().splitlines(), start=1):
stripped = line.strip()
if not stripped:
continue
samples.append(normalize_sft_sample(json.loads(stripped), index))
if not samples:
raise ValueError("dataset_jsonl is empty")
return samples
def render_prompt(tokenizer: Any, sample: SFTSample) -> str:
messages = []
if sample.system:
messages.append({"role": "system", "content": sample.system})
messages.append({"role": "user", "content": sample.user})
if getattr(tokenizer, "chat_template", None):
return tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
prefix = f"System: {sample.system}\n" if sample.system else ""
return f"{prefix}User: {sample.user}\nAssistant: "
def encode_sample(tokenizer: Any, sample: SFTSample, max_seq_len: int) -> dict[str, Any]:
prompt = render_prompt(tokenizer, sample)
eos = tokenizer.eos_token or ""
full = f"{prompt}{sample.assistant}{eos}"
prompt_ids = tokenizer(prompt, add_special_tokens=False).input_ids
full_ids = tokenizer(full, add_special_tokens=False, truncation=True, max_length=max_seq_len).input_ids
if len(full_ids) < 2:
raise ValueError("Encoded sample is too short")
labels = list(full_ids)
mask_to = min(len(prompt_ids), len(labels))
labels[:mask_to] = [-100] * mask_to
if all(label == -100 for label in labels):
labels[-1] = full_ids[-1]
return {"input_ids": full_ids, "labels": labels}
def collate_batch(items: list[dict[str, Any]], pad_token_id: int, device: str) -> dict[str, Any]:
max_len = max(len(item["input_ids"]) for item in items)
input_ids = []
labels = []
attention_mask = []
for item in items:
pad = max_len - len(item["input_ids"])
input_ids.append(item["input_ids"] + [pad_token_id] * pad)
labels.append(item["labels"] + [-100] * pad)
attention_mask.append([1] * len(item["input_ids"]) + [0] * pad)
return {
"input_ids": torch.tensor(input_ids, dtype=torch.long, device=device),
"labels": torch.tensor(labels, dtype=torch.long, device=device),
"attention_mask": torch.tensor(attention_mask, dtype=torch.long, device=device),
}
def evaluate_loss(model: Any, dataset: list[dict[str, Any]], pad_token_id: int, batch_size: int, device: str) -> float:
if not dataset:
return float("nan")
model.eval()
losses = []
with torch.no_grad():
for item in dataset[: min(len(dataset), 12)]:
batch = collate_batch([item], pad_token_id, device)
loss = model(**batch).loss
losses.append(float(loss.item()))
model.train()
return sum(losses) / max(len(losses), 1)
def get_dtype(device: str):
if torch is None:
return None
if device == "cuda":
if hasattr(torch.cuda, "is_bf16_supported") and torch.cuda.is_bf16_supported():
return torch.bfloat16
return torch.float16
return torch.float32
def select_target_modules(model: Any) -> list[str]:
desired = ["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj", "c_proj", "c_attn", "fc1", "fc2"]
module_names = {name.split(".")[-1] for name, module in model.named_modules() if hasattr(module, "weight")}
selected = [name for name in desired if name in module_names]
return selected or ["q_proj", "v_proj"]
def list_adapters() -> list[str]:
adapters = [item for item in ADAPTER_DIR.glob("*") if item.is_dir() and (item / "adapter_config.json").exists()]
return [str(item) for item in sorted(adapters, key=lambda path: path.stat().st_mtime, reverse=True)[:20]]
def load_tokenizer(model_path: str):
tokenizer = AutoTokenizer.from_pretrained(model_path, local_files_only=True, trust_remote_code=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
return tokenizer
def load_base_model(model_path: str, device: str, method: str = "lora"):
dtype = get_dtype(device)
if method == "qlora":
if BitsAndBytesConfig is None or not has_package("bitsandbytes"):
raise RuntimeError("QLoRA requires bitsandbytes, but it is not available in this Python environment.")
quant_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=dtype if dtype in {torch.float16, torch.bfloat16} else torch.float16,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
)
return AutoModelForCausalLM.from_pretrained(
model_path,
quantization_config=quant_config,
device_map="auto",
local_files_only=True,
trust_remote_code=True,
)
model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=dtype,
local_files_only=True,
trust_remote_code=True,
)
return model.to(device)
def cleanup_cuda() -> None:
gc.collect()
if torch is not None and torch.cuda.is_available():
torch.cuda.empty_cache()
def run_training(job_id: str, request: dict[str, Any]) -> None:
global ACTIVE_JOB_ID
if torch is None or AutoModelForCausalLM is None or AutoTokenizer is None or LoraConfig is None or get_peft_model is None:
update_job(job_id, status="failed", error="Required packages are missing. Install torch, transformers, peft, and accelerate.", finished_at=utc_now())
return
stop_event = STOP_EVENTS[job_id]
try:
model_path = str(request.get("model_path") or "").strip()
inspect_model_path(model_path)
method = str(request.get("method") or "lora").lower()
method = "qlora" if method == "qlora" else "lora"
device, device_reason = resolve_device(request.get("device"))
seed = clamp_int(request.get("seed"), 42, 1, 999_999)
random.seed(seed)
torch.manual_seed(seed)
if is_cuda_available():
torch.cuda.manual_seed_all(seed)
samples = parse_sft_jsonl(str(request.get("dataset_jsonl") or ""))
job_dataset_path = DATASET_DIR / f"{job_id}.jsonl"
job_dataset_path.write_text("\n".join(json.dumps({"messages": [
{"role": "system", "content": sample.system},
{"role": "user", "content": sample.user},
{"role": "assistant", "content": sample.assistant},
]}, ensure_ascii=False) for sample in samples) + "\n", encoding="utf-8")
max_seq_len = clamp_int(request.get("max_seq_len"), 1024, 128, 8192)
batch_size = clamp_int(request.get("batch_size"), 1, 1, 16)
grad_accum = clamp_int(request.get("gradient_accumulation_steps"), 8, 1, 128)
max_steps = clamp_int(request.get("max_steps"), 120, 1, 200_000)
target_epochs = clamp_float(request.get("target_epochs"), 2.0, 0.1, 100.0)
learning_rate = clamp_float(request.get("learning_rate"), 2e-4, 1e-6, 1e-2)
lora_r = clamp_int(request.get("lora_r"), 16, 1, 256)
lora_alpha = clamp_int(request.get("lora_alpha"), 32, 1, 512)
lora_dropout = clamp_float(request.get("lora_dropout"), 0.05, 0.0, 0.8)
update_job(job_id, status="running", started_at=utc_now(), model_path=model_path, device=device, dataset_size=len(samples))
append_log(job_id, f"device = {device}")
append_log(job_id, device_reason)
append_log(job_id, f"method = {method}")
append_log(job_id, f"model_path = {model_path}")
append_log(job_id, f"dataset_size = {len(samples)}")
tokenizer = load_tokenizer(model_path)
encoded = [encode_sample(tokenizer, sample, max_seq_len) for sample in samples]
random.shuffle(encoded)
val_size = max(1, int(len(encoded) * 0.12)) if len(encoded) > 3 else 1
val_data = encoded[:val_size]
train_data = encoded[val_size:] or encoded
model = load_base_model(model_path, device, method)
if getattr(model.config, "use_cache", None) is not None:
model.config.use_cache = False
if hasattr(model, "gradient_checkpointing_enable"):
model.gradient_checkpointing_enable()
if method == "qlora" and prepare_model_for_kbit_training is not None:
model = prepare_model_for_kbit_training(model)
target_modules = select_target_modules(model)
lora_config = LoraConfig(
r=lora_r,
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
bias="none",
task_type="CAUSAL_LM",
target_modules=target_modules,
)
model = get_peft_model(model, lora_config)
trainable = sum(parameter.numel() for parameter in model.parameters() if parameter.requires_grad)
total = sum(parameter.numel() for parameter in model.parameters())
optimizer = torch.optim.AdamW((parameter for parameter in model.parameters() if parameter.requires_grad), lr=learning_rate)
adapter_path = ADAPTER_DIR / job_id
update_job(
job_id,
train_size=len(train_data),
val_size=len(val_data),
max_steps=max_steps,
target_epochs=target_epochs,
adapter_path=str(adapter_path),
trainable_parameters=trainable,
total_parameters=total,
)
update_progress(job_id, 0, max_steps, target_epochs)
append_log(job_id, f"target_modules = {target_modules}")
append_log(job_id, f"trainable_parameters = {trainable:,} / {total:,}")
append_log(job_id, f"batch_size = {batch_size}, grad_accum = {grad_accum}, max_steps = {max_steps}")
model.train()
log_every = max(1, max_steps // 12)
start_time = time.time()
pad_token_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else tokenizer.eos_token_id
for step in range(1, max_steps + 1):
if stop_event.is_set():
update_job(job_id, status="stopped", finished_at=utc_now())
append_log(job_id, "training stopped by user")
return
optimizer.zero_grad(set_to_none=True)
accumulated_loss = 0.0
for _ in range(grad_accum):
batch_items = [random.choice(train_data) for _ in range(batch_size)]
batch = collate_batch(batch_items, pad_token_id, device)
loss = model(**batch).loss / grad_accum
loss.backward()
accumulated_loss += float(loss.item())
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
update_progress(job_id, step, max_steps, target_epochs)
if step == 1 or step % log_every == 0 or step == max_steps:
elapsed = max(time.time() - start_time, 1e-6)
tokens = step * grad_accum * batch_size * max_seq_len
val_loss = evaluate_loss(model, val_data, pad_token_id, batch_size, device)
metric = {
"step": step,
"train_loss": round(accumulated_loss, 6),
"val_loss": round(float(val_loss), 6),
"tokens_per_second": int(tokens / elapsed),
"epoch": round((step / max_steps) * target_epochs, 4),
}
append_metric(job_id, metric)
append_log(job_id, f"step {step:5d}/{max_steps} | epoch {metric['epoch']:.2f}/{target_epochs:g} | train_loss {metric['train_loss']:.4f} | val_loss {metric['val_loss']:.4f} | {metric['tokens_per_second']:,} tok/s")
adapter_path.mkdir(parents=True, exist_ok=True)
model.save_pretrained(adapter_path)
tokenizer.save_pretrained(adapter_path)
(adapter_path / "training_metadata.json").write_text(
json.dumps(
{
"model_path": model_path,
"method": method,
"target_modules": target_modules,
"dataset_path": str(job_dataset_path),
"dataset_size": len(samples),
"max_steps": max_steps,
"target_epochs": target_epochs,
"created_at": utc_now(),
},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
update_job(job_id, status="completed", adapter_path=str(adapter_path), finished_at=utc_now())
append_log(job_id, f"adapter saved: {adapter_path}")
except Exception as error:
update_job(job_id, status="failed", error=str(error), finished_at=utc_now())
append_log(job_id, traceback.format_exc())
finally:
cleanup_cuda()
with LOCK:
if ACTIVE_JOB_ID == job_id:
ACTIVE_JOB_ID = None
def generate_text(model_path: str, adapter_path: str | None, system: str, prompt: str, max_new_tokens: int, temperature: float) -> str:
if torch is None or AutoModelForCausalLM is None or AutoTokenizer is None:
raise RuntimeError("Required packages are missing.")
device, _reason = resolve_device()
tokenizer = load_tokenizer(model_path)
sample = SFTSample(system=system, user=prompt, assistant="")
rendered = render_prompt(tokenizer, sample)
model = load_base_model(model_path, device, "lora")
if adapter_path:
if PeftModel is None:
raise RuntimeError("PEFT is not available, cannot load adapter.")
model = PeftModel.from_pretrained(model, adapter_path)
model.eval()
inputs = tokenizer(rendered, return_tensors="pt").to(device)
do_sample = temperature > 0.05
with torch.no_grad():
output_ids = model.generate(
**inputs,
max_new_tokens=max(16, min(int(max_new_tokens), 1600)),
temperature=max(temperature, 0.05),
do_sample=do_sample,
top_p=0.9,
pad_token_id=tokenizer.pad_token_id or tokenizer.eos_token_id,
eos_token_id=tokenizer.eos_token_id,
)
generated = output_ids[0, inputs["input_ids"].shape[-1] :]
text = tokenizer.decode(generated, skip_special_tokens=True).strip()
del model
cleanup_cuda()
return text
def generate_response(request: dict[str, Any]) -> dict[str, Any]:
model_path = str(request.get("model_path") or "").strip()
adapter_path = str(request.get("adapter_path") or "").strip() or None
system = str(request.get("system") or "")
prompt = str(request.get("prompt") or "").strip()
if not prompt:
raise ValueError("prompt is required")
max_new_tokens = clamp_int(request.get("max_new_tokens"), 320, 16, 1600)
temperature = clamp_float(request.get("temperature"), 0.3, 0.0, 1.5)
compare_base = bool(request.get("compare_base"))
result: dict[str, Any] = {"prompt": prompt, "model_path": model_path, "adapter_path": adapter_path}
if compare_base:
result["base_text"] = generate_text(model_path, None, system, prompt, max_new_tokens, temperature)
result["lora_text" if adapter_path else "text"] = generate_text(model_path, adapter_path, system, prompt, max_new_tokens, temperature)
return result
def evaluate_response(request: dict[str, Any]) -> dict[str, Any]:
prompts = request.get("prompts")
if not isinstance(prompts, list) or not prompts:
prompts = [
"为什么 LoRA 微调适合做 AlgoLab 课程助教?",
"解释 assistant-only loss。",
"训练时 GPU 显存不足怎么办?",
]
items = []
for prompt in prompts[:8]:
output = generate_response({**request, "prompt": str(prompt), "compare_base": False}).get("lora_text") or ""
has_terms = any(term.lower() in output.lower() for term in DOMAIN_KEYWORDS)
items.append({
"prompt": str(prompt),
"output": output,
"contains_domain_terms": has_terms,
"length": len(output),
})
total = len(items) or 1
return {
"items": items,
"summary": {
"count": len(items),
"average_length": sum(item["length"] for item in items) / total,
"format_score": sum(1 for item in items if "\n" in item["output"] or "。" in item["output"]) / total,
"domain_keyword_rate": sum(1 for item in items if item["contains_domain_terms"]) / total,
},
}
def status_payload() -> dict[str, Any]:
device, reason = resolve_device()
return {
"ok": True,
"python": sys.version.split()[0],
"torch_available": torch is not None,
"torch_version": getattr(torch, "__version__", None) if torch is not None else None,
"torch_cuda_version": torch_cuda_version(),
"cuda_available": is_cuda_available(),
"cuda_device": torch.cuda.get_device_name(0) if is_cuda_available() else None,
"cuda_device_count": torch.cuda.device_count() if torch is not None else 0,
"transformers_available": AutoModelForCausalLM is not None,
"transformers_version": package_version("transformers"),
"peft_available": LoraConfig is not None,
"peft_version": package_version("peft"),
"accelerate_available": has_package("accelerate"),
"bitsandbytes_available": has_package("bitsandbytes"),
"device_policy": DEVICE_POLICY,
"device": device,
"device_reason": reason,
"working_dir": str(RUN_DIR),
"active_job_id": ACTIVE_JOB_ID,
"adapters": list_adapters(),
}
class LoRADomainAssistantHandler(BaseHTTPRequestHandler):
def log_message(self, format: str, *args: Any) -> None:
print(f"[lora-domain-runner] {self.address_string()} - {format % args}")
def do_OPTIONS(self) -> None:
respond(self, 200, {"ok": True})
def do_GET(self) -> None:
path = (urlparse(self.path).path.rstrip("/") or "/")
if path in {"/", "/status", "/health"}:
respond(self, 200, status_payload())
return
if path.startswith("/jobs/"):
job_id = path.split("/")[-1]
with LOCK:
job = JOBS.get(job_id)
if not job:
respond(self, 404, {"message": "Job not found."})
return
respond(self, 200, job)
return
if path == "/adapters":
respond(self, 200, {"adapters": list_adapters()})
return
respond(self, 404, {"message": "Unknown route."})
def do_POST(self) -> None:
global ACTIVE_JOB_ID
path = (urlparse(self.path).path.rstrip("/") or "/")
try:
if path == "/models/inspect":
body = read_body(self)
respond(self, 200, inspect_model_path(str(body.get("model_path") or "")))
return
if path == "/train":
body = read_body(self)
with LOCK:
if ACTIVE_JOB_ID and JOBS.get(ACTIVE_JOB_ID, {}).get("status") in {"queued", "running"}:
respond(self, 409, {"message": f"Training job already running: {ACTIVE_JOB_ID}"})
return
job_id = uuid.uuid4().hex[:12]
job = {"id": job_id, "status": "queued", "created_at": utc_now(), "logs": [], "metrics": []}
JOBS[job_id] = job
STOP_EVENTS[job_id] = threading.Event()
ACTIVE_JOB_ID = job_id
thread = threading.Thread(target=run_training, args=(job_id, body), daemon=True)
thread.start()
respond(self, 200, job)
return
if path.startswith("/jobs/") and path.endswith("/stop"):
job_id = path.split("/")[2]
if job_id in STOP_EVENTS:
STOP_EVENTS[job_id].set()
with LOCK:
job = JOBS.get(job_id)
respond(self, 200, job or {"message": "Job not found."})
return
if path == "/generate":
respond(self, 200, generate_response(read_body(self)))
return
if path == "/evaluate":
respond(self, 200, evaluate_response(read_body(self)))
return
respond(self, 404, {"message": "Unknown route."})
except Exception as error:
respond(self, 500, {"message": str(error), "traceback": traceback.format_exc()})
def main() -> None:
global DEVICE_POLICY
parser = argparse.ArgumentParser(description="Local LoRA domain assistant runner for AlgoLab.")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=4890)
parser.add_argument("--device", choices=["auto", "cuda", "gpu", "cpu"], default="auto")
args = parser.parse_args()
DEVICE_POLICY = "auto" if args.device in {"auto", "cuda", "gpu"} else "cpu"
server = ThreadingHTTPServer((args.host, args.port), LoRADomainAssistantHandler)
print(f"LoRA domain assistant runner listening on http://{args.host}:{args.port}")
print(f"Working directory: {RUN_DIR}")
print(f"Device policy: {DEVICE_POLICY}")
print(f"Resolved device: {resolve_device()[0]} ({resolve_device()[1]})")
print("Press Ctrl+C to stop.")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nStopping LoRA domain assistant runner.")
finally:
server.server_close()
if __name__ == "__main__":
main()
本地训练 LoRA 领域助手
这里不下载模型,也不在浏览器里训练。前端只通过 HTTP 控制本机 Python runner,base model 必须来自用户本地目录。
# Windows / PowerShell & 'C:\Users\richi\TI_richiebao\LLM\.venv\Scripts\python.exe' scripts\lora_domain_assistant_local_runner.py --host 127.0.0.1 --port 4890 --device auto # Runner API GET /status POST /models/inspect POST /train POST /generate POST /evaluate
内置候选池为 50,000 条多模板合成领域样本;训练时按样本量、seed 和领域比例随机抽取 JSONL 子集,只对 assistant 回答计算 loss。正式训练前仍建议抽查并混入教师模型或人工修订样本。
{
"lineNumber": 1,
"system": "你是 AlgoLab 机器学习课程助教。回答要准确、分步骤、优先解释直觉,再给必要公式或本地训练建议。不要编造实验结果。",
"user": "本地训练 Qwen2.5-Coder-1.5B-Instruct 时遇到“训练 loss 下降但验证 loss 上升”,请给排查顺序。",
"assistant": "这是典型过拟合信号。先检查 train/val 是否来自不同问题,再降低 epoch 或 LoRA rank,并增加真实失败样例。 建议同时记录 model=Qwen2.5-Coder-1.5B-Instruct、rank=32、样本量=3000 和 seed,方便下一轮复现实验。"
}冻结 base model,只训练 adapter。QLoRA 需要本地 bitsandbytes 可用。
等待训练日志...
默认使用最新 adapter,也可以手工选择其它 adapter 目录。
LoRA 与 QLoRA 怎么选
第一版默认普通 LoRA,因为它最稳定,也最容易调试。如果显存不足,再切换 QLoRA。QLoRA 会用 4-bit 方式加载 frozen base model,只训练 LoRA adapter,但它依赖 bitsandbytes,在 Windows 环境里要特别确认安装是否可用。
| 方法 | 优点 | 代价 | 建议 |
|---|---|---|---|
| LoRA | 稳定、调试简单、兼容性好 | 基座模型仍需半精度加载 | 1.5B 第一版优先使用 |
| QLoRA | 显存占用更低 | 依赖 bitsandbytes,速度和兼容性更敏感 | 显存紧张或 3B 模型时尝试 |
评测:从 loss 走向领域可用性
LoRA SFT 的 loss 是必要信号,但不是最终目标。领域助手是否有用,要看它在真实问题上是否更像一个懂课程、懂本地训练、懂约束的助教。
| 指标 | 看什么 | 为什么重要 |
|---|---|---|
| train / val loss | 是否稳定下降,是否过拟合 | 判断训练是否学到分布,而不是只记训练集。 |
| base vs LoRA 对比 | 微调后是否更懂 AlgoLab 术语和流程 | 确认 adapter 真的改变了行为。 |
| 领域关键词覆盖 | 回答是否提到 LoRA、loss、GPU、checkpoint 等关键概念 | 粗略观察领域对齐程度。 |
| 人工可用性 | 回答是否准确、分步骤、可操作 | 最终仍要用真实用户问题验证。 |
数据规模:5万候选池与训练抽样
模块 9 的交互卡片内置一个 50,000 条 AlgoLab 领域 SFT 候选池。它是多模板合成候选池,不等于 50,000 条人工审核或教师模型精修数据。候选池不一次性全部送进训练;实际训练时由用户指定样本量、随机 seed 和领域比例,再抽取 JSONL 子集。这样既能演示生成级数据规模,又能控制本地训练成本。
| 阶段 | 建议抽样量 | 目标 |
|---|---|---|
| Smoke test | 10 到 50 | 确认本地模型、CUDA、runner、adapter 保存和推理都能工作。 |
| 课程风格对齐 | 500 到 1000 | 让回答更像 AlgoLab 助教,覆盖主要模块。 |
| 生成级可用 | 3000 到 10000 | 覆盖概念解释、训练诊断、runner 使用、评测和实验流程,并混入人工或教师模型修订样本。 |
| 大规模续训 | 10000 到 50000 | 用于更长训练或多轮抽样,但要持续做验证集、去重和人工抽查。 |
如果目标是真正产品级助手,不能只靠模板合成数据。更稳的路径是:合成数据跑通分布,教师模型生成高质量问答,人工审阅关键样本,再把课程讲义和最新资料交给 RAG。
扩充数据的教师 Prompt 模板
textYou are creating supervised fine-tuning JSONL for an AlgoLab machine learning course assistant.
Return only valid JSONL. Every line must be one object with a messages array.
System message:
你是 AlgoLab 机器学习课程助教。回答要准确、分步骤、优先解释直觉,再给必要公式或本地训练建议。不要编造实验结果。
Create {sample_count} samples for topic = {topic}.
Topics can include:
- next-token and decoder-only training
- Transformer self-attention
- TinyGPT from scratch
- TinyCodeGPT task-to-code training
- Tool-Calling TinyGPT
- LoRA / QLoRA fine-tuning
- GPU local runner diagnostics
- loss curves and evaluation
- checkpoint, adapter, train/val split
Rules:
1. User questions must be concrete and varied.
2. Assistant answers must be correct, concise, step-by-step, and actionable.
3. Include at least one failure diagnosis or tradeoff in many samples.
4. Do not invent benchmark numbers, hardware results, or file paths.
5. Keep each assistant answer under 180 Chinese characters unless a formula is necessary.
6. Output raw JSONL only, no markdown.
Output shape:
{"messages":[{"role":"system","content":"..."},{"role":"user","content":"..."},{"role":"assistant","content":"..."}]}下一步:LoRA 学行为,RAG 管知识
模块 9 的重点是把本地 LoRA 训练闭环做实。等这个助手能稳定回答课程内问题后,下一阶段不应该继续把所有材料硬背进 adapter,而是把它接到检索系统上。
这也是领域助手真正变得有用的分工:模型负责理解和组织,检索负责事实和上下文。