算法可视化与交互学习平台
TinyCodeGPT:从零训练一个本地任务代码生成模型TinyCodeGPT: Train a Local Task-to-Code LLM from Scratch
把前面 GPT 的 next-token 原理推进到真实工程闭环:设计 Math、Python、NumPy、Pandas、Matplotlib、ML 六类 task→code 任务,利用大模型蒸馏生成训练数据,自动验证清洗样本,在本机用 PyTorch 训练 TinyCodeGPT,实时观察 loss、token 概率、checkpoint 和生成代码的运行结果,让初学者完整学会自己训练一个小型 LLM 任务模型。
先抓住这个模块真正要训练什么
这一节不再停留在 toy TinyGPT。目标是训练一个小而真实的 **TinyCodeGPT**:
它不是 ChatGPT,也不是通用聊天模型,而是一个窄领域任务模型:看到 plot y = x^2 from -10 to 10,生成一段可以本地运行的 NumPy + Matplotlib 代码。
完整工程闭环是:
学完后,你应该能回答一个很实际的问题:如果我要训练一个自己的小型 LLM 任务模型,数据从哪里来、代码怎么写、训练怎么看、结果怎么验证。
第一步:定义 TinyCodeGPT 要学习的六类任务
训练小模型时,最重要的不是一开始追参数量,而是先把任务边界定义清楚。本模块的目标是 task→code,所以数据集必须由任务描述和可运行 Python 代码组成。
| 类别 | 模型要学什么 | 典型样本 |
|---|---|---|
| Math | 公式、变量、基础数值计算 | 圆面积、距离、均值、方差 |
| Python | list、dict、for、if、function | 排序、过滤、词频统计、字符串反转 |
| NumPy | 数组、矩阵、向量化计算 | 归一化、矩阵乘法、linspace |
| Pandas | 表格数据分析 | filter、groupby、column mean |
| Matplotlib | 把算法结果画出来 | line、scatter、bar、hist |
| ML | 小型机器学习过程 | MSE、线性回归、梯度下降 |
这六类任务故意不包含开放闲聊,因为第一阶段的目标不是让模型“什么都懂”,而是让它在一个窄范围内真实学会生成可运行代码。
数据蒸馏目标:用教师模型生成 task-code 样本
数据集 D 由任务 t_i、代码 c_i 和类别 k_i 组成。教师大模型根据任务和类别生成代码,只有能通过本地自动运行验证的样本才进入训练集。
第二步:不是让大模型随便写,而是模板化蒸馏
利用现有大模型生成数据,不等于人工问 10 万次,也不是把 GitHub 代码全抓下来。正确做法是:先由人设计任务模板,再由程序采样参数,最后让教师模型补全标准答案。
例如模板 calculate area of circle with radius {r} 可以采样很多半径。教师模型只负责生成可读、可运行、风格统一的标准答案。
| 阶段 | 谁来做 | 输出 |
|---|---|---|
| 模板设计 | 人 | 任务类型和参数槽位 |
| 参数采样 | 程序 | 大量不同任务 |
| 答案生成 | 大模型 | Python 代码 |
| 验证清洗 | 本地 runner | 保留通过样本 |
| 训练 | 本机 PyTorch | TinyCodeGPT checkpoint |
这样得到的数据小而干净,特别适合第一阶段训练一个能完成具体任务的小模型。
数据生成 Prompt:让教师模型输出统一 JSONL
textYou are a careful Python teacher creating JSONL training data for a small task-to-code model.
Return only valid JSONL. Do not use markdown fences or prose.
Every line must be exactly one JSON object:
{"category":"Math|Python|NumPy|Pandas|Matplotlib|ML","task":"concrete natural-language task","code":"...python code with escaped \n..."}
Batch parameters:
Category: {category}
Sample count: {sample_count}
Generate exactly {sample_count} lines for Category = {category}.
For a larger local dataset, run this prompt repeatedly in batches. Example 5000-line balanced allocation:
- Math: 834
- Python: 833
- NumPy: 833
- Pandas: 833
- Matplotlib: 833
- ML: 834
Use 20-32 samples per batch to avoid truncated model output, then concatenate all JSONL lines.
Category rules:
- Math: formulas, variables, numeric calculation, distance, mean, variance.
- Python: list, dict, loop, if, function, string processing.
- NumPy: arrays, matrices, vectorized numerical computation.
- Pandas: DataFrame filtering, grouping, column calculation, tiny literal tables.
- Matplotlib: line, scatter, bar, histogram plots; include tiny literal data and plt.show().
- ML: sklearn-free examples such as MSE, linear regression with numpy, or gradient descent.
Task design rules:
1. Create varied concrete tasks with small literal values.
2. If a task needs data, include tiny literal data directly in code.
3. Keep each code answer under 25 lines.
4. Include print(...) for numerical, text, and table results.
5. For plots, include plt.show().
6. Do not use file, network, subprocess, shell, environment, or system operations.
7. Escape newlines inside the JSON string as \n.
8. Use only standard Python, numpy, pandas, matplotlib, or sklearn-free ML.
Valid output shape:
{"category":"Math","task":"calculate the area of a circle with radius 7","code":"import math\nr = 7\narea = math.pi * r ** 2\nprint(area)"}
{"category":"Python","task":"count word frequencies in a short list","code":"words = ['cat', 'dog', 'cat']\ncounts = {}\nfor word in words:\n counts[word] = counts.get(word, 0) + 1\nprint(counts)"}
After you generate the JSONL batches, paste the raw lines into AlgoLab's Teacher JSONL dataset box, then start local training.第三步:一条样本如何进入训练集
template = 'plot y = {func} from {start} to {stop}'
task = template.format(func='x^2', start=-10, stop=10)Tokenizer 与错位训练样本
第一版使用字符级 tokenizer:每个字符一个 token。训练时输入 x 和目标 y 只相差一位,因此模型在每个位置都学习预测下一个 token。
第四步:真实训练发生在本地电脑
网页不应该承担长时间训练,也不应该把真实训练做成假动画。本模块采用本地 runner 架构:AlgoLab 页面只负责控制和可视化,本机 Python 进程负责真正计算。
这样做有三个好处:第一,不占服务器 GPU;第二,数据和 checkpoint 留在本机;第三,训练过程可以长期运行、暂停、保存和复用。
| 组件 | 职责 |
|---|---|
| 网页模块 | 展示公式、代码、方法解释,发送训练参数,展示 loss 和日志 |
| Local Runner | 检测 Python/PyTorch/CUDA,生成数据,训练模型,保存 checkpoint |
| PyTorch 模型 | 执行 embedding、attention、FFN、loss、backpropagation |
| 评测器 | 调用模型生成代码,并本地运行验证 |
TinyGPT 结构与训练目标:从字符到可执行代码
这张卡片把 TinyCodeGPT 的真实计算路径压成一条线:字符 id 先变成向量,causal attention 只看左侧上下文,FFN 做逐位置非线性变换,最后每个位置都预测“下一个字符”。训练并不是直接奖励“代码可运行”,而是让可运行代码的字符序列概率越来越高。
1. 字符级 tokenizer 的真实含义
runner.py 里的 `build_vocab(text)` 会把训练文本出现过的字符排序成词表。它简单、透明、适合教学,但也意味着模型是在学习换行、空格、冒号、括号这些字符之间的局部规律。若提示词里出现训练词表没有的字符,runner 会跳过这些字符,所以数据集覆盖越广,调用时越稳定。
2. causal mask 把“补全代码”变成训练目标
每个位置只能看见左侧内容,所以 `<task>... </task><code>` 后面的代码字符,是在任务描述和已生成代码前缀条件下逐字符补出来的。TinyCodeGPT 学到的不是聊天意图,而是固定格式中的 task→code 条件分布。
3. 模型容量主要由四个 UI 参数控制
`Layers` 增加推理步数,`Heads` 增加并行关注模式,`Embedding` 增加每个字符的表达维度,`Context` 决定一次能看多长的 task+code。Embedding 和层数通常比单纯加 epoch 更能提升复杂任务的上限,但也更吃显存和时间。
4. loss 下降不等于代码一定正确
交叉熵优化的是字符概率。它会强烈奖励缩进、括号、API 名称等局部模式,但不会天然执行程序、检查图表或验证数值。因此本模块后面加入“执行生成代码”按钮,让模型质量进入可运行性闭环。
训练核心代码:一个 step 内发生了什么
python# 这段代码对应 tinycodegpt_local_runner.py 的训练内核。
# 重点不是“循环很多次”,而是每个 step 都在估计:
# 给定左侧上下文 x[:, :t],下一个字符 y[:, t] 应该是什么。
def make_batch(data, block_size, batch_size, device):
# data 是整段训练文本编码后的 character id 序列。
# 每个样本随机取一个长度为 block_size 的窗口。
# 因为是随机窗口,一个 epoch 不是“顺序扫完所有 JSONL 行”,
# 而是大约消费 train_tokens / (batch_size * block_size) 个窗口预算。
starts = torch.randint(0, len(data) - block_size - 1, (batch_size,))
# x: [B, T],模型能看到的当前上下文。
# y: [B, T],整体右移一位,作为每个位置的监督答案。
x = torch.stack([data[i:i + block_size] for i in starts]).to(device)
y = torch.stack([data[i + 1:i + block_size + 1] for i in starts]).to(device)
return x, y
steps_per_epoch = math.ceil(len(train_ids) / (batch_size * block_size))
planned_steps = math.ceil(steps_per_epoch * target_epochs) if target_epochs > 0 else fallback_max_steps
for step in range(1, planned_steps + 1):
x, y = make_batch(train_ids, block_size, batch_size, device)
# forward:
# logits.shape == [B, T, vocab_size]
# loss 是 B*T 个 next-character 预测的平均交叉熵。
logits, loss = model(x, y)
# backward:
# PyTorch 根据 loss 自动沿 logits -> blocks -> embeddings 求梯度。
optimizer.zero_grad(set_to_none=True)
loss.backward()
# 小模型也可能出现梯度尖峰;clip 让一次坏 batch 不至于把参数推飞。
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
# validate:
# val_loss 不参与更新,只回答“模型是否也能解释没直接训练的文本窗口”。
if step == 1 or step % log_every == 0 or step == planned_steps:
val_loss = evaluate(model, val_ids, block_size, min(batch_size, 32), device)
epoch = step / steps_per_epoch
print(f"step {step}/{planned_steps} | epoch {epoch:.2f}/{target_epochs} | loss {loss.item():.4f} | val {val_loss:.4f}")
# checkpoint 保存的不只是模型权重,还包括 vocab/config/optimizer。
# 这就是为什么“继续训练”必须加载同一套词表与结构。Cross Entropy:TinyCodeGPT 的 loss 到底在惩罚什么
交叉熵把“下一个字符预测”变成一个可优化的惩罚值:模型先给词表里每个字符打分 logits,softmax 把分数变成概率,然后只看真实答案字符的概率。如果真实字符概率越高,loss 越低;如果模型把概率给错字符,loss 会迅速变大。
1. logits 不是概率,而是可比较的原始分数
TinyGPT 的最后一层对每个位置输出 V 个数字。数字可以为负,也不要求相加为 1,所以它们不是概率。softmax 的作用是把这些分数变成概率分布,并放大最高分和其他分数之间的差异。
2. 交叉熵只惩罚真实答案字符的概率
如果当前位置真实下一个字符是 token id=0,那么 loss 只看 p[0]。p[0] 越接近 1,-log(p[0]) 越接近 0;p[0] 越小,loss 越大。这就是为什么模型会被推动去提高真实下一个字符的概率。
3. 随机基线 ln(V) 是判断训练是否学到东西的第一把尺
如果模型对 V 个字符平均乱猜,每个字符概率都是 1/V,那么真实字符的概率也是 1/V,loss 就是 -log(1/V)=log(V)。因此当 V=39 时,随机 loss 约 3.664;训练 loss 明显低于它,才说明模型捕捉到了 token 顺序结构。
4. 为什么 loss 是 B×T 个位置的平均
一次训练不是只判断整段代码对不对,而是在 batch 中每个窗口、每个位置都做一个 next-token 分类题。B=32、T=320 时,一次 optimizer.step 会平均 10240 个分类题的交叉熵。
5. loss 降低和代码可运行之间隔着一步执行验证
cross entropy 优化的是字符概率,不直接执行代码。它能让缩进、括号、API 名称和常见模式更像训练集,但代码是否真的运行、数值是否正确,还要靠本地 execute_code 验证。
本地训练控制台:生成数据、训练、查看 loss、调用模型
这个操作台只负责连接本机训练服务;真正的数据生成、PyTorch 训练、checkpoint 保存和代码生成都发生在本地电脑。
学习公式、变量和基础数值计算。
学习 list、dict、for、if 和字符串处理。
学习数组、矩阵和向量化科学计算。
学习表格筛选、分组统计和列计算。
学习把函数、点集和统计结果画成图。
学习最小机器学习代码结构。
Teacher JSONL 数据集
请选择数据来源:内置数据集会从应用自带的 10000 行 JSONL 按样本数、分类权重和 seed 抽样;在线教师模型会调用个人 LLM API 现生成。24 行只适合验证链路,建议从 1000 行起步,严肃实验可用 5000 到 10000 行。
源码深读:单条 JSONL 样本流程追踪
从 raw JSONL 中选一行,按 runner.py 的真实路径追踪:JSON 解析、task-code 包装、字符词表、token id、x/y 右移监督、steps/epoch 预算,再执行样本代码得到 stdout 或图表。
{"category":"Math","task":"calculate population variance of [112, 34, 64, 80, 19, 2, 31]","code":"values = [112, 34, 64, 80, 19, 2, 31]\nmean = sum(values) / len(values)\nvariance = sum((x - mean) ** 2 for x in values) / len(values)\nprint(round(variance, 4))"}{
"category": "Math",
"task": "calculate population variance of [112, 34, 64, 80, 19, 2, 31]",
"code": "values = [112, 34, 64, 80, 19, 2, 31]\nmean = sum(values) / len(values)\nvariance = sum((x - mean) ** 2 for x in values) / len(values)\nprint(round(variance, 4))"
}runner 不直接训练 JSON 对象,而是把它变成固定边界的纯文本。生成时也用同样的 <task> 和 <code> 边界唤起 task 到 code 的条件补全。
<task> calculate population variance of [112, 34, 64, 80, 19, 2, 31] </task> <code> values = [112, 34, 64, 80, 19, 2, 31] mean = sum(values) / len(values) variance = sum((x - mean) ** 2 for x in values) / len(values) print(round(variance, 4)) </code>
values = [112, 34, 64, 80, 19, 2, 31]
mean = sum(values) / n = 342 / 7 = 48.857143
population variance = sum((x - mean) ** 2) / n
= 8812.857143 / 7
= 1258.979592vocab preview 是字符字典的前若干项,表示 id -> char;selected sample token ids 是当前样本文本逐字符编码后的 id 序列。两者不是按显示位置一一对应, 而是“token id 去 vocab 里查字符”。下面的 x/y 表才是样本序列上的一一对应关系:第 pos 个输入字符 x 要预测右移一位的目标字符 y。vocab preview: \n | space | ( | ) | * | , | - | / | 0 | 1 | 2 | 3 | 4 | 6 | 8 | 9 | < | = | > | [ | ] | a | c | d | e | f | i | k | l | m | n | o | p | r | s | t | u | v | x
selected sample token ids: 16, 35, 21, 34, 27, 18, 0, 22, 21, 28, 22, 36, 28, 21, 35, 24, 1, 32, 31, 32, 36, 28, 21, 35, 26, 31, 30, 1, 37, 21, 33, 26, 21, 30, 22, 24, 1, 31, 25, 1, 19, 9, 9, 10, 5, 1, 11, 12, 5, 1, 13, 12, 5, 1, 14, 8, 5, 1, 9, 15, 5, 1, 10, 5, 1, 11, 9, 20, 0, 16, 7, 35
steps_per_epoch = ceil(train_tokens / (batch_size * block_size))
= ceil(230 / 10240)
= 1
loss = mean cross_entropy(logits.reshape(-1, vocab), y.reshape(-1))如果这条样本窗口被 make_batch 抽中,它会和其他随机窗口一起组成真实训练 batch。这里继续追踪确定性的张量形状和计算关系;真实 loss 数值取决于当前模型权重,必须由本地 runner 的 PyTorch forward 才能算出。
x[t] -> y[t] 监督对。当前 context 是 320, 所以这条样本单独占满约 78.4% 的上下文。当前样本: formatted_tokens = 251 supervised_pairs = formatted_tokens - 1 = 250 当前训练设置: B = 32 T = 320 V = 39 d = 192 heads = 4 head_dim = 48 一次 step: loss_terms = B * T = 32 * 320 = 10240 logit_scores = B * T * V = 32 * 320 * 39 = 399360 随机猜测基线: loss_random = ln(V) = ln(39) = 3.663562 perplexity_random = exp(loss_random) ≈ 39
# make_batch 抽中某个窗口时,训练监督是右移一位 x = ids[i : i + 320] # shape [32, 320] y = ids[i + 1 : i + 321] # shape [32, 320] # TinyCodeGPT.forward(idx=x, targets=y) h0 = token_emb(x) + pos_emb(arange(320)) # [32, 320, 192] q, k, v = Linear(h).split(192) # each [32, 320, 192] attention = softmax(q @ k.T / sqrt(head_dim) + causal_mask) h = TransformerBlock(... repeated 4 layers ...) logits = lm_head(layer_norm(h)) # [32, 320, 39] loss = cross_entropy(logits.reshape(10240, 39), y.reshape(10240))
# backward 阶段不是“单独更新这一条 JSONL” # 而是用当前 batch 的 B*T 个 next-token 判断共同更新参数。 optimizer.zero_grad(set_to_none=True) loss.backward() clip_grad_norm_(model.parameters(), 1.0) optimizer.step() # 梯度路径 loss -> logits / lm_head -> final layer norm -> Transformer blocks: attention + FFN -> token_emb / pos_emb # 真实 loss 数值需要当前权重: # 初始化模型、继续训练 checkpoint、学习率、历史 step 都会影响它。
p(next token | left context)。训练 loss 只看字符概率;执行结果才告诉我们样本代码在 Python 里输出了什么,或生成了哪些图表 artifact。
规则解释器负责确定性计算,LLM 只在你点击时分析当前这一条样本。适合未识别、基础预览,或想把代码语义和训练流程讲得更深入的情况。
.venv\Scripts\python.exe scripts\tinycodegpt_local_runner.py
启动训练后会显示 step、loss、tokens/sec、checkpoint 路径。
训练完成后在这里查看模型生成的 Python 代码。
第五步:不要只看 loss,要看代码是否能运行
loss 下降只能说明模型越来越会预测训练文本里的下一个 token。对 task→code 模型来说,更重要的是生成代码能不能真正运行。
| 指标 | 解释 |
|---|---|
| Train Loss | 训练集 next-token cross entropy |
| Val Loss | 验证集 next-token cross entropy,用来观察过拟合 |
| Syntax Pass Rate | 生成代码能否通过 Python 解析 |
| Execution Pass Rate | 生成代码能否本地执行不报错 |
| Category Pass Rate | Math、Python、NumPy、Pandas、Matplotlib、ML 各自表现 |
对初学者来说,最重要的复盘问题是:模型失败时,是数据类别不足、代码风格不统一、训练步数不够,还是模型太小?这比单纯追求更低 loss 更接近真实 LLM 工程。
本地启动命令
powershell# 在 AlgoLab 项目根目录运行
.venv\Scripts\python.exe scripts\tinycodegpt_local_runner.py
# 默认服务地址
# http://127.0.0.1:4877/status
# 训练产生的 checkpoint 会保存在
# .tmp/tinycodegpt/checkpointstinycodegpt_local_runner.py 完整代码(可复制)
pythonfrom __future__ import annotations
import argparse
import base64
import json
import math
import os
import random
import subprocess
import sys
import tempfile
import threading
import time
import traceback
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
try:
import torch
import torch.nn as nn
import torch.nn.functional as F
except Exception: # pragma: no cover - status endpoint reports this clearly.
torch = None
class _NNStub:
Module = object
nn = _NNStub()
F = None
def torch_no_grad():
if torch is not None:
return torch.no_grad()
def decorator(function):
return function
return decorator
ROOT_DIR = Path(__file__).resolve().parents[1]
RUN_DIR = ROOT_DIR / ".tmp" / "tinycodegpt"
CHECKPOINT_DIR = RUN_DIR / "checkpoints"
RUN_OUTPUT_DIR = RUN_DIR / "generated_runs"
DATASET_DIR = RUN_DIR / "datasets"
RUN_DIR.mkdir(parents=True, exist_ok=True)
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
RUN_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
DATASET_DIR.mkdir(parents=True, exist_ok=True)
JOBS: dict[str, dict[str, Any]] = {}
STOP_EVENTS: dict[str, threading.Event] = {}
ACTIVE_JOB_ID: str | None = None
LOCK = threading.Lock()
CATEGORIES = ("Math", "Python", "NumPy", "Pandas", "Matplotlib", "ML")
FORBIDDEN_CODE_PATTERNS = (
"subprocess",
"os.system",
"socket",
"requests",
"urllib",
"shutil",
"pickle",
"eval(",
"exec(",
"open(",
"Path(",
)
@dataclass
class ModelConfig:
n_layer: int
n_head: int
n_embd: int
block_size: int
PRESETS = {
"tiny": ModelConfig(n_layer=2, n_head=2, n_embd=96, block_size=192),
"small": ModelConfig(n_layer=4, n_head=4, n_embd=192, block_size=320),
"medium": ModelConfig(n_layer=6, n_head=6, n_embd=384, block_size=512),
}
def clamp_int(value: Any, fallback: int, lower: int, upper: int) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
parsed = fallback
return max(lower, min(parsed, upper))
def clamp_float(value: Any, fallback: float, lower: float, upper: float) -> float:
try:
parsed = float(value)
except (TypeError, ValueError):
parsed = fallback
return max(lower, min(parsed, upper))
def resolve_model_config(request: dict[str, Any]) -> tuple[str, ModelConfig]:
preset = str(request.get("preset") or "small")
base_config = PRESETS.get(preset, PRESETS["small"])
raw_config = request.get("model_config") if preset == "custom" else None
if not isinstance(raw_config, dict):
return preset if preset in PRESETS else "small", base_config
config = ModelConfig(
n_layer=clamp_int(raw_config.get("n_layer"), base_config.n_layer, 1, 12),
n_head=clamp_int(raw_config.get("n_head"), base_config.n_head, 1, 12),
n_embd=clamp_int(raw_config.get("n_embd"), base_config.n_embd, 32, 768),
block_size=clamp_int(raw_config.get("block_size"), base_config.block_size, 64, 1024),
)
if config.n_embd % config.n_head != 0:
raise ValueError("n_embd must be divisible by n_head for custom model config.")
return "custom", config
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
def jsonable(value: Any) -> Any:
if isinstance(value, Path):
return str(value)
return value
def respond(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any]) -> None:
raw = json.dumps(payload, ensure_ascii=False, default=jsonable).encode("utf-8")
handler.send_response(status)
handler.send_header("Content-Type", "application/json; charset=utf-8")
handler.send_header("Content-Length", str(len(raw)))
handler.send_header("Access-Control-Allow-Origin", "*")
handler.send_header("Access-Control-Allow-Headers", "Content-Type")
handler.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
handler.end_headers()
handler.wfile.write(raw)
def read_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]:
length = int(handler.headers.get("Content-Length") or "0")
if length <= 0:
return {}
raw = handler.rfile.read(length).decode("utf-8")
return json.loads(raw or "{}")
def update_job(job_id: str, **patch: Any) -> None:
with LOCK:
job = JOBS[job_id]
job.update(patch)
def append_log(job_id: str, line: str) -> None:
with LOCK:
job = JOBS[job_id]
logs = job.setdefault("logs", [])
logs.append(line)
if len(logs) > 240:
del logs[: len(logs) - 240]
def append_metric(job_id: str, metric: dict[str, Any]) -> None:
with LOCK:
job = JOBS[job_id]
metrics = job.setdefault("metrics", [])
metrics.append(metric)
if len(metrics) > 240:
del metrics[: len(metrics) - 240]
def add_sample(samples: list[dict[str, str]], category: str, task: str, code: str) -> None:
samples.append({"category": category, "task": task.strip(), "code": code.strip()})
def generate_dataset(samples_per_category: int, seed: int = 7) -> list[dict[str, str]]:
rng = random.Random(seed)
samples: list[dict[str, str]] = []
for _ in range(samples_per_category):
r = rng.randint(1, 80)
add_sample(
samples,
"Math",
f"calculate the area of a circle with radius {r}",
f"import math\nr = {r}\narea = math.pi * r ** 2\nprint(area)",
)
w, h = rng.randint(2, 60), rng.randint(2, 60)
add_sample(
samples,
"Math",
f"calculate rectangle area with width {w} and height {h}",
f"width = {w}\nheight = {h}\narea = width * height\nprint(area)",
)
numbers = [rng.randint(-20, 80) for _ in range(rng.randint(4, 8))]
add_sample(
samples,
"Math",
f"calculate mean and variance of {numbers}",
"nums = "
+ repr(numbers)
+ "\nmean = sum(nums) / len(nums)\nvariance = sum((x - mean) ** 2 for x in nums) / len(nums)\nprint(mean, variance)",
)
x1, y1, x2, y2 = [rng.randint(-20, 20) for _ in range(4)]
add_sample(
samples,
"Math",
f"calculate distance between ({x1}, {y1}) and ({x2}, {y2})",
f"import math\nx1, y1 = {x1}, {y1}\nx2, y2 = {x2}, {y2}\ndistance = math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)\nprint(distance)",
)
py_nums = [rng.randint(-30, 90) for _ in range(rng.randint(5, 10))]
add_sample(
samples,
"Python",
f"sort the list {py_nums} in ascending order",
f"nums = {py_nums}\nsorted_nums = sorted(nums)\nprint(sorted_nums)",
)
threshold = rng.randint(0, 40)
add_sample(
samples,
"Python",
f"filter numbers greater than {threshold} from {py_nums}",
f"nums = {py_nums}\nfiltered = [x for x in nums if x > {threshold}]\nprint(filtered)",
)
words = rng.choices(["alpha", "beta", "gamma", "delta", "theta", "lambda"], k=8)
add_sample(
samples,
"Python",
f"count word frequencies in {' '.join(words)}",
"text = "
+ repr(" ".join(words))
+ "\ncounts = {}\nfor word in text.split():\n counts[word] = counts.get(word, 0) + 1\nprint(counts)",
)
arr = [rng.randint(1, 50) for _ in range(5)]
add_sample(
samples,
"NumPy",
f"normalize numpy array {arr} to range 0 to 1",
"import numpy as np\narr = np.array("
+ repr(arr)
+ ", dtype=float)\nnormalized = (arr - arr.min()) / (arr.max() - arr.min())\nprint(normalized)",
)
a = [[rng.randint(1, 5), rng.randint(1, 5)], [rng.randint(1, 5), rng.randint(1, 5)]]
b = [[rng.randint(1, 5), rng.randint(1, 5)], [rng.randint(1, 5), rng.randint(1, 5)]]
add_sample(
samples,
"NumPy",
f"multiply two numpy matrices {a} and {b}",
f"import numpy as np\nA = np.array({a})\nB = np.array({b})\nC = A @ B\nprint(C)",
)
start, stop = rng.randint(-10, 0), rng.randint(5, 20)
add_sample(
samples,
"NumPy",
f"create {8} evenly spaced numpy values from {start} to {stop}",
f"import numpy as np\nx = np.linspace({start}, {stop}, 8)\nprint(x)",
)
prices = [rng.randint(10, 100) for _ in range(6)]
cities = rng.choices(["Beijing", "Shanghai", "Chengdu"], k=6)
add_sample(
samples,
"Pandas",
"calculate mean price by city in a pandas dataframe",
"import pandas as pd\n"
+ f"df = pd.DataFrame({{'city': {cities}, 'price': {prices}}})\n"
+ "result = df.groupby('city')['price'].mean()\nprint(result)",
)
add_sample(
samples,
"Pandas",
f"filter pandas rows where price is greater than {threshold}",
"import pandas as pd\n"
+ f"df = pd.DataFrame({{'city': {cities}, 'price': {prices}}})\n"
+ f"filtered = df[df['price'] > {threshold}]\nprint(filtered)",
)
func = rng.choice(["x ** 2", "np.sin(x)", "np.cos(x)", "x ** 3"])
add_sample(
samples,
"Matplotlib",
f"plot y = {func} from -10 to 10",
"import numpy as np\nimport matplotlib.pyplot as plt\nx = np.linspace(-10, 10, 100)\ny = "
+ func
+ "\nplt.plot(x, y)\nplt.title('function plot')\nplt.show()",
)
add_sample(
samples,
"Matplotlib",
"draw a scatter plot for two small numeric lists",
"import matplotlib.pyplot as plt\nx = [1, 2, 3, 4, 5]\ny = [2, 5, 4, 8, 7]\nplt.scatter(x, y)\nplt.xlabel('x')\nplt.ylabel('y')\nplt.show()",
)
slope = rng.uniform(0.5, 4.0)
intercept = rng.uniform(-3.0, 3.0)
xs = list(range(-5, 6))
ys = [round(slope * x + intercept, 3) for x in xs]
add_sample(
samples,
"ML",
"fit linear regression with numpy least squares",
"import numpy as np\n"
+ f"x = np.array({xs}, dtype=float)\ny = np.array({ys}, dtype=float)\n"
+ "X = np.c_[x, np.ones_like(x)]\ncoef, bias = np.linalg.lstsq(X, y, rcond=None)[0]\nprint(coef, bias)",
)
add_sample(
samples,
"ML",
"run gradient descent for y = 2x + 1",
"import numpy as np\nx = np.array([0, 1, 2, 3], dtype=float)\ny = 2 * x + 1\nw, b = 0.0, 0.0\nlr = 0.05\nfor step in range(200):\n pred = w * x + b\n loss = np.mean((pred - y) ** 2)\n dw = np.mean(2 * (pred - y) * x)\n db = np.mean(2 * (pred - y))\n w -= lr * dw\n b -= lr * db\nprint(round(w, 3), round(b, 3), round(loss, 6))",
)
rng.shuffle(samples)
return samples
def strip_jsonl_fences(raw: str) -> str:
text = raw.strip()
if text.startswith("```"):
lines = text.splitlines()
if lines and lines[0].lstrip().startswith("```"):
lines = lines[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
text = "\n".join(lines).strip()
return text
def assert_safe_sample_code(code: str, line_number: int) -> None:
lowered = code.lower()
for pattern in FORBIDDEN_CODE_PATTERNS:
if pattern.lower() in lowered:
raise ValueError(f"line {line_number}: code contains forbidden operation {pattern!r}")
def normalize_dataset_sample(value: Any, line_number: int) -> dict[str, str]:
if not isinstance(value, dict):
raise ValueError(f"line {line_number}: expected a JSON object")
category = str(value.get("category") or "").strip()
task = str(value.get("task") or "").strip()
code = str(value.get("code") or "").strip()
if category not in CATEGORIES:
raise ValueError(f"line {line_number}: category must be one of {', '.join(CATEGORIES)}")
if not task:
raise ValueError(f"line {line_number}: task is required")
if not code:
raise ValueError(f"line {line_number}: code is required")
assert_safe_sample_code(code, line_number)
return {"category": category, "task": task, "code": code}
def parse_jsonl_dataset(raw: str) -> list[dict[str, str]]:
text = strip_jsonl_fences(raw)
if not text:
raise ValueError("dataset_jsonl is empty")
if text.startswith("["):
parsed = json.loads(text)
if not isinstance(parsed, list):
raise ValueError("dataset JSON array is invalid")
samples = [normalize_dataset_sample(item, index + 1) for index, item in enumerate(parsed)]
else:
samples = []
for index, line in enumerate(text.splitlines(), start=1):
stripped = line.strip()
if not stripped:
continue
samples.append(normalize_dataset_sample(json.loads(stripped), index))
if not samples:
raise ValueError("dataset_jsonl does not contain any samples")
return samples
def save_jsonl_dataset(job_id: str, samples: list[dict[str, str]]) -> Path:
dataset_path = DATASET_DIR / f"{job_id}.jsonl"
payload = "\n".join(json.dumps(sample, ensure_ascii=False) for sample in samples) + "\n"
dataset_path.write_text(payload, encoding="utf-8")
return dataset_path
def load_training_samples(
job_id: str,
request: dict[str, Any],
samples_per_category: int,
seed: int,
) -> tuple[list[dict[str, str]], str, Path | None]:
raw_jsonl = str(request.get("dataset_jsonl") or "").strip()
if raw_jsonl:
samples = parse_jsonl_dataset(raw_jsonl)
dataset_path = save_jsonl_dataset(job_id, samples)
return samples, "teacher_jsonl", dataset_path
return generate_dataset(samples_per_category, seed=seed), "template_synthetic", None
def format_sample(sample: dict[str, str]) -> str:
return f"<task>\n{sample['task']}\n</task>\n<code>\n{sample['code']}\n</code>\n"
def build_vocab(text: str) -> tuple[dict[str, int], dict[int, str]]:
chars = sorted(set(text))
stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for ch, i in stoi.items()}
return stoi, itos
def count_parameters(model: nn.Module) -> int:
return sum(parameter.numel() for parameter in model.parameters())
def normalize_itos(raw_itos: dict[Any, str]) -> dict[int, str]:
if not raw_itos:
return {}
return {int(key): value for key, value in raw_itos.items()} if isinstance(next(iter(raw_itos.keys())), str) else raw_itos
def load_resume_checkpoint(path_value: str | None) -> tuple[Path, dict[str, Any]]:
checkpoint_path = Path(path_value) if path_value else latest_checkpoint()
if checkpoint_path is None or not checkpoint_path.exists():
raise RuntimeError("No checkpoint found to continue training from.")
return checkpoint_path, torch.load(checkpoint_path, map_location="cpu")
class CausalSelfAttention(nn.Module):
def __init__(self, config: ModelConfig):
super().__init__()
if config.n_embd % config.n_head != 0:
raise ValueError("n_embd must be divisible by n_head")
self.n_head = config.n_head
self.head_dim = config.n_embd // config.n_head
self.qkv = nn.Linear(config.n_embd, 3 * config.n_embd)
self.proj = nn.Linear(config.n_embd, config.n_embd)
self.register_buffer("mask", torch.tril(torch.ones(config.block_size, config.block_size)).view(1, 1, config.block_size, config.block_size))
def forward(self, x):
batch, tokens, channels = x.shape
qkv = self.qkv(x)
q, k, v = qkv.split(channels, dim=2)
q = q.view(batch, tokens, self.n_head, self.head_dim).transpose(1, 2)
k = k.view(batch, tokens, self.n_head, self.head_dim).transpose(1, 2)
v = v.view(batch, tokens, self.n_head, self.head_dim).transpose(1, 2)
attention = (q @ k.transpose(-2, -1)) / math.sqrt(self.head_dim)
attention = attention.masked_fill(self.mask[:, :, :tokens, :tokens] == 0, float("-inf"))
attention = F.softmax(attention, dim=-1)
y = attention @ v
y = y.transpose(1, 2).contiguous().view(batch, tokens, channels)
return self.proj(y)
class TransformerBlock(nn.Module):
def __init__(self, config: ModelConfig):
super().__init__()
self.ln1 = nn.LayerNorm(config.n_embd)
self.attn = CausalSelfAttention(config)
self.ln2 = nn.LayerNorm(config.n_embd)
self.ffn = nn.Sequential(
nn.Linear(config.n_embd, 4 * config.n_embd),
nn.GELU(),
nn.Linear(4 * config.n_embd, config.n_embd),
)
def forward(self, x):
x = x + self.attn(self.ln1(x))
x = x + self.ffn(self.ln2(x))
return x
class TinyCodeGPT(nn.Module):
def __init__(self, vocab_size: int, config: ModelConfig):
super().__init__()
self.config = config
self.token_emb = nn.Embedding(vocab_size, config.n_embd)
self.pos_emb = nn.Embedding(config.block_size, config.n_embd)
self.blocks = nn.Sequential(*[TransformerBlock(config) for _ in range(config.n_layer)])
self.ln_f = nn.LayerNorm(config.n_embd)
self.head = nn.Linear(config.n_embd, vocab_size, bias=False)
def forward(self, idx, targets=None):
_, tokens = idx.shape
positions = torch.arange(tokens, device=idx.device)
x = self.token_emb(idx) + self.pos_emb(positions)
x = self.blocks(x)
logits = self.head(self.ln_f(x))
loss = None
if targets is not None:
loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)), targets.reshape(-1))
return logits, loss
@torch_no_grad()
def generate(
self,
idx,
max_new_tokens: int,
temperature: float = 0.8,
top_k: int = 30,
stop_sequence: list[int] | None = None,
):
for _ in range(max_new_tokens):
idx_cond = idx[:, -self.config.block_size :]
logits, _ = self(idx_cond)
logits = logits[:, -1, :] / max(temperature, 1e-4)
if top_k:
values, _ = torch.topk(logits, min(top_k, logits.size(-1)))
logits[logits < values[:, [-1]]] = -float("inf")
probs = F.softmax(logits, dim=-1)
next_id = torch.multinomial(probs, num_samples=1)
idx = torch.cat((idx, next_id), dim=1)
if stop_sequence and idx.size(1) >= len(stop_sequence):
tail = idx[0, -len(stop_sequence) :].detach().cpu().tolist()
if tail == stop_sequence:
break
return idx
def make_batch(data: Any, block_size: int, batch_size: int, device: str):
starts = torch.randint(0, len(data) - block_size - 1, (batch_size,))
x = torch.stack([data[i : i + block_size] for i in starts]).to(device)
y = torch.stack([data[i + 1 : i + block_size + 1] for i in starts]).to(device)
return x, y
def evaluate(model: nn.Module, data: Any, block_size: int, batch_size: int, device: str) -> float:
model.eval()
losses = []
with torch.no_grad():
for _ in range(6):
x, y = make_batch(data, block_size, batch_size, device)
_, loss = model(x, y)
losses.append(float(loss.item()))
model.train()
return sum(losses) / len(losses)
def run_training(job_id: str, request: dict[str, Any]) -> None:
global ACTIVE_JOB_ID
if torch is None:
update_job(job_id, status="failed", error="PyTorch is not installed in this Python environment.", finished_at=utc_now())
return
stop_event = STOP_EVENTS[job_id]
try:
continue_from_checkpoint = bool(request.get("continue_from_checkpoint"))
resume_checkpoint_path = str(request.get("checkpoint_path") or "").strip() or None
preset, config = resolve_model_config(request)
samples_per_category = max(10, min(int(request.get("samples_per_category") or 80), 600))
fallback_max_steps = max(20, min(int(request.get("max_steps") or 50_000), 200_000))
target_epochs = clamp_float(request.get("target_epochs"), 3.0, 0.0, 100.0)
batch_size = max(4, min(int(request.get("batch_size") or 32), 128))
learning_rate = float(request.get("learning_rate") or 8e-4)
seed = int(request.get("seed") or 7)
random.seed(seed)
torch.manual_seed(seed)
device = "cuda" if torch.cuda.is_available() else "cpu"
update_job(job_id, status="running", started_at=utc_now(), device=device)
append_log(job_id, f"device = {device}")
append_log(job_id, f"preset = {preset}, config = {config}")
samples, dataset_source, dataset_path = load_training_samples(job_id, request, samples_per_category, seed)
text = "\n".join(format_sample(sample) for sample in samples)
resume_checkpoint: dict[str, Any] | None = None
resumed_from_checkpoint: Path | None = None
if continue_from_checkpoint:
resumed_from_checkpoint, resume_checkpoint = load_resume_checkpoint(resume_checkpoint_path)
config = ModelConfig(**resume_checkpoint["config"])
preset = str(resume_checkpoint.get("metadata", {}).get("preset") or "continued")
stoi = resume_checkpoint["stoi"]
itos = normalize_itos(resume_checkpoint["itos"])
missing_chars = sorted(set(text) - set(stoi))
if missing_chars:
preview = "".join(missing_chars[:20])
raise RuntimeError(
"Cannot continue from this checkpoint because the new dataset contains characters "
f"that are not in the checkpoint vocabulary: {preview!r}"
)
append_log(job_id, f"continue_from_checkpoint = {resumed_from_checkpoint}")
append_log(job_id, f"checkpoint config = {config}")
else:
stoi, itos = build_vocab(text)
ids = torch.tensor([stoi[ch] for ch in text], dtype=torch.long)
if len(ids) <= config.block_size + batch_size + 2:
raise RuntimeError(
"Dataset is too small for the selected preset and batch size. "
"Generate more JSONL samples or choose the tiny preset."
)
split = max(config.block_size + batch_size + 2, int(len(ids) * 0.92))
split = min(split, len(ids) - config.block_size - batch_size - 2)
train_ids = ids[:split]
val_ids = ids[split:]
if len(val_ids) <= config.block_size + batch_size + 2:
val_ids = train_ids
steps_per_epoch = max(1, math.ceil(len(train_ids) / (batch_size * config.block_size)))
epoch_steps = math.ceil(steps_per_epoch * target_epochs) if target_epochs > 0 else fallback_max_steps
max_steps = max(20, min(epoch_steps, 200_000))
model = TinyCodeGPT(len(stoi), config).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.1)
if resume_checkpoint is not None:
model.load_state_dict(resume_checkpoint["model_state_dict"])
if resume_checkpoint.get("optimizer_state_dict"):
optimizer.load_state_dict(resume_checkpoint["optimizer_state_dict"])
for state in optimizer.state.values():
for key, value in state.items():
if hasattr(value, "to"):
state[key] = value.to(device)
parameter_count = count_parameters(model)
update_job(
job_id,
dataset_size=len(samples),
dataset_source=dataset_source,
dataset_path=str(dataset_path) if dataset_path else None,
vocab_size=len(stoi),
parameter_count=parameter_count,
preset=preset,
model_config=config.__dict__,
max_steps=max_steps,
fallback_max_steps=fallback_max_steps,
target_epochs=target_epochs,
steps_per_epoch=steps_per_epoch,
resumed_from_checkpoint=str(resumed_from_checkpoint) if resumed_from_checkpoint else None,
)
append_log(job_id, f"dataset_size = {len(samples)} samples")
append_log(job_id, f"dataset_source = {dataset_source}")
if dataset_path:
append_log(job_id, f"dataset saved: {dataset_path}")
append_log(job_id, f"vocab_size = {len(stoi)} characters")
append_log(job_id, f"parameters = {parameter_count:,}")
append_log(
job_id,
f"target_epochs = {target_epochs:g}, steps_per_epoch ≈ {steps_per_epoch}, "
f"planned_steps = {max_steps}, fallback_max_steps = {fallback_max_steps}",
)
log_every = max(10, max_steps // 18)
start_time = time.time()
for step in range(1, max_steps + 1):
if stop_event.is_set():
update_job(job_id, status="stopped", finished_at=utc_now())
append_log(job_id, "training stopped by user")
return
x, y = make_batch(train_ids, config.block_size, batch_size, device)
_, loss = model(x, y)
optimizer.zero_grad(set_to_none=True)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
if step == 1 or step % log_every == 0 or step == max_steps:
val_loss = evaluate(model, val_ids, config.block_size, min(batch_size, 32), device)
elapsed = max(time.time() - start_time, 1e-6)
tokens_per_second = int(step * batch_size * config.block_size / elapsed)
metric = {
"step": step,
"train_loss": round(float(loss.item()), 6),
"val_loss": round(float(val_loss), 6),
"tokens_per_second": tokens_per_second,
"epoch": round(step / steps_per_epoch, 4),
}
append_metric(job_id, metric)
append_log(
job_id,
f"step {step:5d}/{max_steps} | epoch {metric['epoch']:.2f}/{target_epochs:g} | train_loss {metric['train_loss']:.4f} | val_loss {metric['val_loss']:.4f} | {tokens_per_second:,} tok/s",
)
checkpoint_path = CHECKPOINT_DIR / f"{job_id}.pt"
torch.save(
{
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"config": config.__dict__,
"stoi": stoi,
"itos": itos,
"samples": samples[:120],
"metadata": {
"preset": preset,
"model_config": config.__dict__,
"samples_per_category": samples_per_category,
"max_steps": max_steps,
"fallback_max_steps": fallback_max_steps,
"target_epochs": target_epochs,
"steps_per_epoch": steps_per_epoch,
"batch_size": batch_size,
"learning_rate": learning_rate,
"dataset_source": dataset_source,
"dataset_path": str(dataset_path) if dataset_path else None,
"dataset_size": len(samples),
"resumed_from_checkpoint": str(resumed_from_checkpoint) if resumed_from_checkpoint else None,
"created_at": utc_now(),
},
},
checkpoint_path,
)
update_job(job_id, status="completed", checkpoint_path=str(checkpoint_path), finished_at=utc_now())
append_log(job_id, f"checkpoint saved: {checkpoint_path}")
except Exception as error:
update_job(job_id, status="failed", error=str(error), finished_at=utc_now())
append_log(job_id, traceback.format_exc())
finally:
with LOCK:
if ACTIVE_JOB_ID == job_id:
ACTIVE_JOB_ID = None
def latest_checkpoint() -> Path | None:
checkpoints = sorted(CHECKPOINT_DIR.glob("*.pt"), key=lambda item: item.stat().st_mtime, reverse=True)
return checkpoints[0] if checkpoints else None
def load_checkpoint(path_value: str | None):
if torch is None:
raise RuntimeError("PyTorch is not installed in this Python environment.")
checkpoint_path = Path(path_value) if path_value else latest_checkpoint()
if checkpoint_path is None or not checkpoint_path.exists():
raise RuntimeError("No TinyCodeGPT checkpoint found. Train a model first.")
checkpoint = torch.load(checkpoint_path, map_location="cpu")
config = ModelConfig(**checkpoint["config"])
stoi = checkpoint["stoi"]
itos = normalize_itos(checkpoint["itos"])
model = TinyCodeGPT(len(stoi), config)
model.load_state_dict(checkpoint["model_state_dict"])
model.eval()
return checkpoint_path, model, config, stoi, itos
def extract_code(text: str) -> str:
marker = "<code>\n"
start = text.find(marker)
if start >= 0:
text = text[start + len(marker) :]
end = text.find("</code>")
if end >= 0:
text = text[:end]
return text.strip()
def encode_png_artifact(path: Path) -> dict[str, str]:
payload = base64.b64encode(path.read_bytes()).decode("ascii")
return {
"filename": path.name,
"path": str(path),
"mime_type": "image/png",
"data_url": f"data:image/png;base64,{payload}",
}
def run_generated_code(code: str, timeout_seconds: int = 12) -> dict[str, Any]:
if not code.strip():
return {"output": [], "error": "Model returned empty code.", "artifacts": []}
try:
assert_safe_sample_code(code, 1)
except ValueError as error:
return {"output": [], "error": str(error), "artifacts": []}
run_id = uuid.uuid4().hex[:10]
script_path = RUN_OUTPUT_DIR / f"generated_{run_id}.py"
capture_header = """
# AlgoLab runs matplotlib with a non-interactive backend and captures figures
# after the generated code finishes. Ignore the expected plt.show() warning.
import warnings as _algolab_warnings
_algolab_warnings.filterwarnings(
"ignore",
message="FigureCanvasAgg is non-interactive.*",
category=UserWarning,
)
"""
capture_footer = f"""
# AlgoLab captures matplotlib figures after generated code runs.
try:
import os as _algolab_os
import sys as _algolab_sys
try:
import matplotlib.pyplot as _algolab_plt
except Exception:
_algolab_plt = None
_algolab_plot_dir = r"{RUN_OUTPUT_DIR}"
if _algolab_plt is not None:
for _algolab_index, _algolab_num in enumerate(_algolab_plt.get_fignums(), start=1):
_algolab_figure = _algolab_plt.figure(_algolab_num)
_algolab_path = _algolab_os.path.join(_algolab_plot_dir, "{run_id}_plot_" + str(_algolab_index) + ".png")
_algolab_figure.savefig(_algolab_path, bbox_inches="tight")
except Exception as _algolab_plot_error:
print("[AlgoLab plot capture failed] " + str(_algolab_plot_error), file=_algolab_sys.stderr)
"""
script_path.write_text(f"{capture_header}\n{code.rstrip()}\n{capture_footer}", encoding="utf-8")
env = {
**os.environ,
"PYTHONUTF8": "1",
"PYTHONIOENCODING": "utf-8",
"MPLBACKEND": "Agg",
}
try:
completed = subprocess.run(
[sys.executable, "-X", "utf8", str(script_path)],
cwd=str(RUN_OUTPUT_DIR),
env=env,
text=True,
capture_output=True,
timeout=timeout_seconds,
check=False,
)
except subprocess.TimeoutExpired:
return {"output": [], "error": f"Generated code timed out after {timeout_seconds} seconds.", "artifacts": []}
output = (completed.stdout or "").strip().splitlines()
stderr_lines = (completed.stderr or "").strip().splitlines()
filtered_stderr_lines: list[str] = []
skip_show_line = False
for line in stderr_lines:
if "FigureCanvasAgg is non-interactive" in line:
skip_show_line = True
continue
if skip_show_line and line.strip().startswith("plt.show("):
skip_show_line = False
continue
skip_show_line = False
filtered_stderr_lines.append(line)
stderr = "\n".join(filtered_stderr_lines).strip()
error = stderr or None
if completed.returncode != 0 and not error:
error = f"Generated code exited with status {completed.returncode}."
artifacts = [encode_png_artifact(path) for path in sorted(RUN_OUTPUT_DIR.glob(f"{run_id}_plot_*.png"))]
return {
"output": output[-80:],
"error": error,
"artifacts": artifacts,
"return_code": completed.returncode,
"script_path": str(script_path),
}
def generate_code(request: dict[str, Any]) -> dict[str, Any]:
checkpoint_path, model, config, stoi, itos = load_checkpoint(request.get("checkpoint_path"))
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
task = str(request.get("task") or "").strip()
if not task:
raise RuntimeError("Task cannot be empty.")
prompt = f"<task>\n{task}\n</task>\n<code>\n"
encoded = [stoi[ch] for ch in prompt if ch in stoi]
if not encoded:
raise RuntimeError("Prompt contains no known characters from the tokenizer vocabulary.")
idx = torch.tensor([encoded], dtype=torch.long, device=device)
max_new_tokens = max(80, min(int(request.get("max_new_tokens") or 900), 2000))
temperature = max(0.2, min(float(request.get("temperature") or 0.75), 1.5))
stop_sequence = [stoi[ch] for ch in "\n</code>" if ch in stoi]
with torch.no_grad():
generated = model.generate(
idx,
max_new_tokens=max_new_tokens,
temperature=temperature,
stop_sequence=stop_sequence if stop_sequence else None,
)
text = "".join(itos[int(token)] for token in generated[0].detach().cpu().tolist())
code = extract_code(text)
return {
"task": task,
"code": code,
"truncated": "</code>" not in text,
"checkpoint_path": str(checkpoint_path),
}
def execute_code(request: dict[str, Any]) -> dict[str, Any]:
timeout_seconds = max(1, min(int(request.get("timeout_seconds") or 12), 30))
code = str(request.get("code") or "").strip()
result = run_generated_code(code, timeout_seconds=timeout_seconds)
return {"code": code, **result}
def status_payload() -> dict[str, Any]:
checkpoints = sorted(CHECKPOINT_DIR.glob("*.pt"), key=lambda item: item.stat().st_mtime, reverse=True)
torch_available = torch is not None
cuda_available = bool(torch_available and torch.cuda.is_available())
return {
"ok": True,
"python": sys.version.split()[0],
"torch_available": torch_available,
"torch_version": getattr(torch, "__version__", None) if torch_available else None,
"cuda_available": cuda_available,
"cuda_device": torch.cuda.get_device_name(0) if cuda_available else None,
"device": "cuda" if cuda_available else "cpu",
"working_dir": str(RUN_DIR),
"active_job_id": ACTIVE_JOB_ID,
"checkpoints": [str(item) for item in checkpoints[:10]],
}
class TinyCodeGPTHandler(BaseHTTPRequestHandler):
def log_message(self, format: str, *args: Any) -> None:
print(f"[tinycodegpt-runner] {self.address_string()} - {format % args}")
def do_OPTIONS(self) -> None:
respond(self, 200, {"ok": True})
def do_GET(self) -> None:
parsed = urlparse(self.path)
path = parsed.path.rstrip("/") or "/"
if path in {"/", "/status", "/health"}:
respond(self, 200, status_payload())
return
if path.startswith("/jobs/"):
job_id = path.split("/")[-1]
with LOCK:
job = JOBS.get(job_id)
if not job:
respond(self, 404, {"message": "Job not found."})
return
respond(self, 200, job)
return
respond(self, 404, {"message": "Unknown route."})
def do_POST(self) -> None:
global ACTIVE_JOB_ID
parsed = urlparse(self.path)
path = parsed.path.rstrip("/") or "/"
try:
if path == "/train":
body = read_body(self)
with LOCK:
if ACTIVE_JOB_ID and JOBS.get(ACTIVE_JOB_ID, {}).get("status") in {"queued", "running"}:
respond(self, 409, {"message": f"Training job already running: {ACTIVE_JOB_ID}"})
return
job_id = uuid.uuid4().hex[:12]
job = {
"id": job_id,
"status": "queued",
"created_at": utc_now(),
"logs": [],
"metrics": [],
}
JOBS[job_id] = job
STOP_EVENTS[job_id] = threading.Event()
ACTIVE_JOB_ID = job_id
thread = threading.Thread(target=run_training, args=(job_id, body), daemon=True)
thread.start()
respond(self, 200, job)
return
if path.startswith("/jobs/") and path.endswith("/stop"):
parts = path.split("/")
job_id = parts[2] if len(parts) >= 3 else ""
if job_id in STOP_EVENTS:
STOP_EVENTS[job_id].set()
with LOCK:
job = JOBS.get(job_id)
if not job:
respond(self, 404, {"message": "Job not found."})
return
respond(self, 200, job)
return
if path == "/generate":
body = read_body(self)
result = generate_code(body)
respond(self, 200, result)
return
if path == "/execute":
body = read_body(self)
result = execute_code(body)
respond(self, 200, result)
return
respond(self, 404, {"message": "Unknown route."})
except Exception as error:
respond(self, 500, {"message": str(error), "traceback": traceback.format_exc()})
def main() -> None:
parser = argparse.ArgumentParser(description="Local TinyCodeGPT training runner for AlgoLab.")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=4877)
args = parser.parse_args()
server = ThreadingHTTPServer((args.host, args.port), TinyCodeGPTHandler)
print(f"TinyCodeGPT local runner listening on http://{args.host}:{args.port}")
print(f"Working directory: {RUN_DIR}")
print("Press Ctrl+C to stop.")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nStopping TinyCodeGPT local runner.")
finally:
server.server_close()
if __name__ == "__main__":
main()
源码深读一:runner.py 不是一个脚本,而是一条本地训练流水线
前 6 个模块已经讲过 Transformer、token、attention 和训练的基本概念。这里换一个视角:把 tinycodegpt_local_runner.py 当作一条工程流水线看。用户在网页上点按钮后,真正的数据、状态和参数都在 runner 里流动。
| 源码区域 | 它维护的状态 | 它解决的问题 | 容易误解的点 |
|---|---|---|---|
DATASET_DIR / CHECKPOINT_DIR / JOBS | 数据文件、checkpoint、训练任务状态 | 让网页刷新后还能查询训练结果,并把模型文件留在本机 | 网页不是训练主体;网页只是控制台 |
parse_jsonl_dataset | 统一的 {category, task, code} 样本 | 把教师模型生成、内置抽样、手工粘贴都归一成同一训练格式 | JSONL 行数多不等于质量高;代码风格和类别覆盖也很关键 |
format_sample | <task> 与 <code> 边界标记 | 把监督学习任务固定成“看到任务,续写代码” | 这些边界符不是装饰,它们决定生成时从哪里开始停在哪里 |
build_vocab / encode | 字符词表与整数序列 | 把 Python 文本变成 PyTorch 能训练的 LongTensor | 字符级模型可以跑通全流程,但复杂语义效率低于 BPE |
TinyCodeGPT.forward | logits 与 loss | 把每个位置的隐藏向量投影成下一个字符概率 | forward 里并不知道程序是否可运行 |
run_training | device、steps、optimizer、metrics | 执行真实反向传播,持续写日志与 loss 曲线 | steps_per_epoch 是由 token 预算算出来的,不是用户随手填的 epoch 步数 |
generate_code / execute_code | 生成文本、执行输出、图表 artifact | 让模型质量从“像代码”进入“能运行、有结果”的验证闭环 | 执行失败是训练反馈,不只是 UI 错误 |
源码深读二:一个训练 step 到底在估计什么
用户看到的 step/epoch 来自 token 预算,而不是 JSONL 行号。每个 step 随机抽取 B 个长度为 T 的窗口,y 是 x 右移一位。这样一次反向传播同时训练 B×T 个 next-character 判断。
为什么 5000 条数据只有约 131 step/epoch
如果 5000 条 JSONL 拼成约 145 万字符 token,训练集约 134 万 token。以 batch_size=32、context=320 计算,每 step 消费 10240 个 token 位置,因此一轮约 1340000/10240≈131 step。这是正常的,不代表只训练了 131 条样本。
Fallback Steps 为什么不再乘 Epoch
当前 runner 的语义是:Target Epochs 大于 0 时,总步数由数据规模和 epoch 推导;Target Epochs 等于 0 时,才把 Fallback Steps 当作手动训练预算。这样避免“epoch”和“max steps”两个上限互相打架。
想要更准,优先扩大哪几个量
对这个任务,优先级通常是:提高 JSONL 质量与覆盖面,增加模型容量,再增加训练轮数。单纯把 epoch 拉很高,会让模型更熟悉现有样本,但也更容易记住模板而不是学会泛化。
源码深读三:训练预算图,哪些参数会放大计算量
源码深读四:生成、执行与续训,为什么它们是同一条质量闭环
训练完成后,runner 并没有结束。真正有用的本地模型,需要经历“加载 checkpoint → 生成代码 → 执行代码 → 观察错误 → 补数据或续训”的循环。这个循环比单独看 loss 更接近真实可用性。
| 阶段 | runner.py 中的关键函数 | 输入 | 输出 | 它告诉我们什么 |
|---|---|---|---|---|
| 加载模型 | load_checkpoint | checkpoint path / latest checkpoint | model、config、stoi、itos | 生成必须使用训练时同一套词表和结构,否则权重对不上 |
| 构造提示 | generate_code | 用户任务文本 | <task>...<code> 前缀 | 提示格式要贴近训练样本格式,才能激活 task→code 能力 |
| 采样生成 | model.generate | max_new_tokens、temperature | 代码字符串 | Max New Tokens 太小会截断代码;temperature 太高会增加语法漂移 |
| 本地执行 | run_generated_code | 生成的 Python | stdout、stderr、PNG artifact | 图表任务要看 artifact,数值任务要看 stdout,错误任务要看 traceback |
| 继续训练 | continue_from_checkpoint | 旧 checkpoint + 新 JSONL | 更新后的 checkpoint | 当错误集中在某类任务时,补该类数据再续训比从头训练更高效 |
因此,课程里不应该只说“loss 降了”。更合理的判断是:验证 loss 下降、生成未明显截断、执行成功率提升、图表/数值结果符合任务。只有这四个信号同时改善,TinyCodeGPT 才从 toy demo 接近可用的小任务代码模型。