算法可视化与交互学习平台

TinyCodeGPT:从零训练一个本地任务代码生成模型TinyCodeGPT: Train a Local Task-to-Code LLM from Scratch

把前面 GPT 的 next-token 原理推进到真实工程闭环:设计 Math、Python、NumPy、Pandas、Matplotlib、ML 六类 task→code 任务,利用大模型蒸馏生成训练数据,自动验证清洗样本,在本机用 PyTorch 训练 TinyCodeGPT,实时观察 loss、token 概率、checkpoint 和生成代码的运行结果,让初学者完整学会自己训练一个小型 LLM 任务模型。

LLMIntermediateFree
KernelGPU
1

先抓住这个模块真正要训练什么

这一节不再停留在 toy TinyGPT。目标是训练一个小而真实的 **TinyCodeGPT**:

自然语言任务 task -> Python 代码 code

它不是 ChatGPT,也不是通用聊天模型,而是一个窄领域任务模型:看到 plot y = x^2 from -10 to 10,生成一段可以本地运行的 NumPy + Matplotlib 代码。

完整工程闭环是:

任务类别设计 -> 大模型蒸馏生成 task-code 数据 -> 自动执行验证与清洗 -> tokenizer -> dataset x/y 错位 -> TinyGPT 训练 -> 本地 checkpoint -> 代码生成 -> 本地执行验证 -> 按类别评测

学完后,你应该能回答一个很实际的问题:如果我要训练一个自己的小型 LLM 任务模型,数据从哪里来、代码怎么写、训练怎么看、结果怎么验证。

2

第一步:定义 TinyCodeGPT 要学习的六类任务

训练小模型时,最重要的不是一开始追参数量,而是先把任务边界定义清楚。本模块的目标是 task→code,所以数据集必须由任务描述和可运行 Python 代码组成。

类别模型要学什么典型样本
Math公式、变量、基础数值计算圆面积、距离、均值、方差
Pythonlist、dict、for、if、function排序、过滤、词频统计、字符串反转
NumPy数组、矩阵、向量化计算归一化、矩阵乘法、linspace
Pandas表格数据分析filter、groupby、column mean
Matplotlib把算法结果画出来line、scatter、bar、hist
ML小型机器学习过程MSE、线性回归、梯度下降

这六类任务故意不包含开放闲聊,因为第一阶段的目标不是让模型“什么都懂”,而是让它在一个窄范围内真实学会生成可运行代码。

数据集设计原则:少而准 > 大而脏 任务类别清晰 -> 训练信号清晰 -> 评测也清晰
3

数据蒸馏目标:用教师模型生成 task-code 样本

数据集 D 由任务 t_i、代码 c_i 和类别 k_i 组成。教师大模型根据任务和类别生成代码,只有能通过本地自动运行验证的样本才进入训练集。

自然语言任务描述,例如 plot y=x^2
natural-language task
教师模型生成的 Python 代码
Python code generated by the teacher model
任务类别:Math、Python、NumPy、Pandas、Matplotlib、ML
task category
本地执行与验证代码是否可运行
local execution check
4

第二步:不是让大模型随便写,而是模板化蒸馏

利用现有大模型生成数据,不等于人工问 10 万次,也不是把 GitHub 代码全抓下来。正确做法是:先由人设计任务模板,再由程序采样参数,最后让教师模型补全标准答案。

任务模板 -> 参数采样 -> 教师模型生成代码 -> 本地执行验证 -> JSONL 训练集

例如模板 calculate area of circle with radius {r} 可以采样很多半径。教师模型只负责生成可读、可运行、风格统一的标准答案。

阶段谁来做输出
模板设计任务类型和参数槽位
参数采样程序大量不同任务
答案生成大模型Python 代码
验证清洗本地 runner保留通过样本
训练本机 PyTorchTinyCodeGPT checkpoint

这样得到的数据小而干净,特别适合第一阶段训练一个能完成具体任务的小模型。

代码

数据生成 Prompt:让教师模型输出统一 JSONL

text
You are a careful Python teacher creating JSONL training data for a small task-to-code model.

Return only valid JSONL. Do not use markdown fences or prose.
Every line must be exactly one JSON object:
{"category":"Math|Python|NumPy|Pandas|Matplotlib|ML","task":"concrete natural-language task","code":"...python code with escaped \n..."}

Batch parameters:
Category: {category}
Sample count: {sample_count}

Generate exactly {sample_count} lines for Category = {category}.

For a larger local dataset, run this prompt repeatedly in batches. Example 5000-line balanced allocation:
- Math: 834
- Python: 833
- NumPy: 833
- Pandas: 833
- Matplotlib: 833
- ML: 834
Use 20-32 samples per batch to avoid truncated model output, then concatenate all JSONL lines.

Category rules:
- Math: formulas, variables, numeric calculation, distance, mean, variance.
- Python: list, dict, loop, if, function, string processing.
- NumPy: arrays, matrices, vectorized numerical computation.
- Pandas: DataFrame filtering, grouping, column calculation, tiny literal tables.
- Matplotlib: line, scatter, bar, histogram plots; include tiny literal data and plt.show().
- ML: sklearn-free examples such as MSE, linear regression with numpy, or gradient descent.

Task design rules:
1. Create varied concrete tasks with small literal values.
2. If a task needs data, include tiny literal data directly in code.
3. Keep each code answer under 25 lines.
4. Include print(...) for numerical, text, and table results.
5. For plots, include plt.show().
6. Do not use file, network, subprocess, shell, environment, or system operations.
7. Escape newlines inside the JSON string as \n.
8. Use only standard Python, numpy, pandas, matplotlib, or sklearn-free ML.

Valid output shape:
{"category":"Math","task":"calculate the area of a circle with radius 7","code":"import math\nr = 7\narea = math.pi * r ** 2\nprint(area)"}
{"category":"Python","task":"count word frequencies in a short list","code":"words = ['cat', 'dog', 'cat']\ncounts = {}\nfor word in words:\n    counts[word] = counts.get(word, 0) + 1\nprint(counts)"}

After you generate the JSONL batches, paste the raw lines into AlgoLab's Teacher JSONL dataset box, then start local training.
6

第三步:一条样本如何进入训练集

从模板生成一个任务
template = 'plot y = {func} from {start} to {stop}'
task = template.format(func='x^2', start=-10, stop=10)
Initial Variables
category
Matplotlib
task
plot y = x^2 from -10 to 10
Step 1 Variables
task
plot y = x^2 from -10 to 10
Step 1 / 4
7

Tokenizer 与错位训练样本

第一版使用字符级 tokenizer:每个字符一个 token。训练时输入 x 和目标 y 只相差一位,因此模型在每个位置都学习预测下一个 token。

第 t 个 token id
token id at position t
模型看到的上下文 token
input context tokens
向右错一位的目标 token
one-token-shifted targets
8

第四步:真实训练发生在本地电脑

网页不应该承担长时间训练,也不应该把真实训练做成假动画。本模块采用本地 runner 架构:AlgoLab 页面只负责控制和可视化,本机 Python 进程负责真正计算。

AlgoLab Web UI -> http://127.0.0.1:4877 -> TinyCodeGPT local runner -> PyTorch training process -> local checkpoints / logs / generated code

这样做有三个好处:第一,不占服务器 GPU;第二,数据和 checkpoint 留在本机;第三,训练过程可以长期运行、暂停、保存和复用。

组件职责
网页模块展示公式、代码、方法解释,发送训练参数,展示 loss 和日志
Local Runner检测 Python/PyTorch/CUDA,生成数据,训练模型,保存 checkpoint
PyTorch 模型执行 embedding、attention、FFN、loss、backpropagation
评测器调用模型生成代码,并本地运行验证
9

TinyGPT 结构与训练目标:从字符到可执行代码

这张卡片把 TinyCodeGPT 的真实计算路径压成一条线:字符 id 先变成向量,causal attention 只看左侧上下文,FFN 做逐位置非线性变换,最后每个位置都预测“下一个字符”。训练并不是直接奖励“代码可运行”,而是让可运行代码的字符序列概率越来越高。

batch size;一次反向传播同时看的窗口数量
batch size
block_size / context;每个训练窗口的长度
context length
每个 attention head 的维度,等于 n_embd / n_head
dimension per attention head
上三角为负无穷的 mask,禁止当前位置偷看未来 token
causal mask
所有 batch、所有位置的 next-token 交叉熵均值
mean next-token cross entropy

1. 字符级 tokenizer 的真实含义

runner.py 里的 `build_vocab(text)` 会把训练文本出现过的字符排序成词表。它简单、透明、适合教学,但也意味着模型是在学习换行、空格、冒号、括号这些字符之间的局部规律。若提示词里出现训练词表没有的字符,runner 会跳过这些字符,所以数据集覆盖越广,调用时越稳定。

vocab = sorted(set(training_text)) stoi[ch] -> character id itos[id] -> character

2. causal mask 把“补全代码”变成训练目标

每个位置只能看见左侧内容,所以 `<task>... </task><code>` 后面的代码字符,是在任务描述和已生成代码前缀条件下逐字符补出来的。TinyCodeGPT 学到的不是聊天意图,而是固定格式中的 task→code 条件分布。

3. 模型容量主要由四个 UI 参数控制

`Layers` 增加推理步数,`Heads` 增加并行关注模式,`Embedding` 增加每个字符的表达维度,`Context` 决定一次能看多长的 task+code。Embedding 和层数通常比单纯加 epoch 更能提升复杂任务的上限,但也更吃显存和时间。

4. loss 下降不等于代码一定正确

交叉熵优化的是字符概率。它会强烈奖励缩进、括号、API 名称等局部模式,但不会天然执行程序、检查图表或验证数值。因此本模块后面加入“执行生成代码”按钮,让模型质量进入可运行性闭环。

低 loss -> 更像训练分布 可执行 -> 语法、依赖、运行时行为同时通过
代码

训练核心代码:一个 step 内发生了什么

python
# 这段代码对应 tinycodegpt_local_runner.py 的训练内核。
# 重点不是“循环很多次”,而是每个 step 都在估计:
# 给定左侧上下文 x[:, :t],下一个字符 y[:, t] 应该是什么。

def make_batch(data, block_size, batch_size, device):
    # data 是整段训练文本编码后的 character id 序列。
    # 每个样本随机取一个长度为 block_size 的窗口。
    # 因为是随机窗口,一个 epoch 不是“顺序扫完所有 JSONL 行”,
    # 而是大约消费 train_tokens / (batch_size * block_size) 个窗口预算。
    starts = torch.randint(0, len(data) - block_size - 1, (batch_size,))

    # x: [B, T],模型能看到的当前上下文。
    # y: [B, T],整体右移一位,作为每个位置的监督答案。
    x = torch.stack([data[i:i + block_size] for i in starts]).to(device)
    y = torch.stack([data[i + 1:i + block_size + 1] for i in starts]).to(device)
    return x, y


steps_per_epoch = math.ceil(len(train_ids) / (batch_size * block_size))
planned_steps = math.ceil(steps_per_epoch * target_epochs) if target_epochs > 0 else fallback_max_steps

for step in range(1, planned_steps + 1):
    x, y = make_batch(train_ids, block_size, batch_size, device)

    # forward:
    # logits.shape == [B, T, vocab_size]
    # loss 是 B*T 个 next-character 预测的平均交叉熵。
    logits, loss = model(x, y)

    # backward:
    # PyTorch 根据 loss 自动沿 logits -> blocks -> embeddings 求梯度。
    optimizer.zero_grad(set_to_none=True)
    loss.backward()

    # 小模型也可能出现梯度尖峰;clip 让一次坏 batch 不至于把参数推飞。
    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
    optimizer.step()

    # validate:
    # val_loss 不参与更新,只回答“模型是否也能解释没直接训练的文本窗口”。
    if step == 1 or step % log_every == 0 or step == planned_steps:
        val_loss = evaluate(model, val_ids, block_size, min(batch_size, 32), device)
        epoch = step / steps_per_epoch
        print(f"step {step}/{planned_steps} | epoch {epoch:.2f}/{target_epochs} | loss {loss.item():.4f} | val {val_loss:.4f}")

# checkpoint 保存的不只是模型权重,还包括 vocab/config/optimizer。
# 这就是为什么“继续训练”必须加载同一套词表与结构。
Loss

Cross Entropy:TinyCodeGPT 的 loss 到底在惩罚什么

交叉熵把“下一个字符预测”变成一个可优化的惩罚值:模型先给词表里每个字符打分 logits,softmax 把分数变成概率,然后只看真实答案字符的概率。如果真实字符概率越高,loss 越低;如果模型把概率给错字符,loss 会迅速变大。

词表大小;字符级 TinyCodeGPT 中等于训练文本出现过的不同字符数
vocabulary size
第 b 个窗口、第 t 个位置的未归一化候选字符分数
unnormalized logits at batch b and position t
softmax 后,第 i 个字符作为下一个字符的概率
softmax probability of token i
真实下一个字符的 token id,也就是 x 右移一位后的目标
target next-token id
当前 batch 的平均 cross entropy loss
mean cross entropy over the batch

1. logits 不是概率,而是可比较的原始分数

TinyGPT 的最后一层对每个位置输出 V 个数字。数字可以为负,也不要求相加为 1,所以它们不是概率。softmax 的作用是把这些分数变成概率分布,并放大最高分和其他分数之间的差异。

logits = [2.0, 0.5, -1.0] softmax(logits) ≈ [0.786, 0.175, 0.039]

2. 交叉熵只惩罚真实答案字符的概率

如果当前位置真实下一个字符是 token id=0,那么 loss 只看 p[0]。p[0] 越接近 1,-log(p[0]) 越接近 0;p[0] 越小,loss 越大。这就是为什么模型会被推动去提高真实下一个字符的概率。

p(correct)=0.90 -> loss=-log(0.90)=0.105 p(correct)=0.50 -> loss=-log(0.50)=0.693 p(correct)=0.10 -> loss=-log(0.10)=2.303 p(correct)=0.01 -> loss=-log(0.01)=4.605

3. 随机基线 ln(V) 是判断训练是否学到东西的第一把尺

如果模型对 V 个字符平均乱猜,每个字符概率都是 1/V,那么真实字符的概率也是 1/V,loss 就是 -log(1/V)=log(V)。因此当 V=39 时,随机 loss 约 3.664;训练 loss 明显低于它,才说明模型捕捉到了 token 顺序结构。

4. 为什么 loss 是 B×T 个位置的平均

一次训练不是只判断整段代码对不对,而是在 batch 中每个窗口、每个位置都做一个 next-token 分类题。B=32、T=320 时,一次 optimizer.step 会平均 10240 个分类题的交叉熵。

5. loss 降低和代码可运行之间隔着一步执行验证

cross entropy 优化的是字符概率,不直接执行代码。它能让缩进、括号、API 名称和常见模式更像训练集,但代码是否真的运行、数值是否正确,还要靠本地 execute_code 验证。

低 loss: 更会预测下一个字符 可运行: 语法、依赖、运行时逻辑和输出同时通过
运行

本地训练控制台:生成数据、训练、查看 loss、调用模型

这个操作台只负责连接本机训练服务;真正的数据生成、PyTorch 训练、checkpoint 保存和代码生成都发生在本地电脑。

Math

学习公式、变量和基础数值计算。

circle area / mean / variance
Python

学习 list、dict、for、if 和字符串处理。

sort list / word count
NumPy

学习数组、矩阵和向量化科学计算。

normalize array / matrix multiply
Pandas

学习表格筛选、分组统计和列计算。

groupby mean / filter rows
Matplotlib

学习把函数、点集和统计结果画成图。

plot x^2 / scatter chart
ML

学习最小机器学习代码结构。

linear regression / gradient descent

Teacher JSONL 数据集

请选择数据来源:内置数据集会从应用自带的 10000 行 JSONL 按样本数、分类权重和 seed 抽样;在线教师模型会调用个人 LLM API 现生成。24 行只适合验证链路,建议从 1000 行起步,严肃实验可用 5000 到 10000 行。

Source
Built-in 10000
当前 JSONL:0
大数据集默认只显示首尾预览;完整 JSONL 会在后台保留,用于下载和本地训练,避免 5000 或 10000 行直接渲染拖慢页面。
分类权重是比例,不是条数:六类都为 1 时平均分配;把 ML 调到 3,就会比其他类多约 3 倍。
内置路线:点击“抽取内置 JSONL”即可得到可训练数据;相同 seed、样本数和权重会得到相同抽样结果。
本地大模型路线:复制“数据生成 Prompt”卡片,按分类分批输出 JSONL,再粘贴到左侧。
点击“开始本地训练”后,页面会把左侧 raw JSONL 直接发送给本机 runner,runner 会保存并用它构造 task→code 训练文本。

源码深读:单条 JSONL 样本流程追踪

从 raw JSONL 中选一行,按 runner.py 的真实路径追踪:JSON 解析、task-code 包装、字符词表、token id、x/y 右移监督、steps/epoch 预算,再执行样本代码得到 stdout 或图表。

当前 raw JSONL 为空,先用内置示例演示完整流程。抽取内置 JSONL 或粘贴数据后,这里会自动切换为真实数据集。
Valid JSONL
1
Invalid Lines
0
Selected Line
demo
Category
Math
Task Chars
61
1. raw JSONL 行到 Python dict
{"category":"Math","task":"calculate population variance of [112, 34, 64, 80, 19, 2, 31]","code":"values = [112, 34, 64, 80, 19, 2, 31]\nmean = sum(values) / len(values)\nvariance = sum((x - mean) ** 2 for x in values) / len(values)\nprint(round(variance, 4))"}
Raw Chars
261
Code Chars
158
Code Lines
4
{
  "category": "Math",
  "task": "calculate population variance of [112, 34, 64, 80, 19, 2, 31]",
  "code": "values = [112, 34, 64, 80, 19, 2, 31]\nmean = sum(values) / len(values)\nvariance = sum((x - mean) ** 2 for x in values) / len(values)\nprint(round(variance, 4))"
}
2. runner.py 的 format_sample

runner 不直接训练 JSON 对象,而是把它变成固定边界的纯文本。生成时也用同样的 <task><code> 边界唤起 task 到 code 的条件补全。

<task>
calculate population variance of [112, 34, 64, 80, 19, 2, 31]
</task>
<code>
values = [112, 34, 64, 80, 19, 2, 31]
mean = sum(values) / len(values)
variance = sum((x - mean) ** 2 for x in values) / len(values)
print(round(variance, 4))
</code>
Formatted Chars = Tokens
251
3. 样本里的可解释数值公式
解释覆盖:规则命中置信度:High
数字列表:方差/标准差相关计算
这条样本明确要求 variance 或标准化,因此这里解释 mean、平方误差和 population variance。若代码除以 n-1,才是 sample variance。
n
7
sum
342
mean
48.8571
sum sq err
8812.8571
pop variance
1258.9796
sample variance
1468.8095
values = [112, 34, 64, 80, 19, 2, 31]
mean = sum(values) / n = 342 / 7 = 48.857143
population variance = sum((x - mean) ** 2) / n
                    = 8812.857143 / 7
                    = 1258.979592
4. 字符词表、token id 与 batch 监督目标
关系说明:vocab preview 是字符字典的前若干项,表示 id -> charselected sample token ids 是当前样本文本逐字符编码后的 id 序列。两者不是按显示位置一一对应, 而是“token id 去 vocab 里查字符”。下面的 x/y 表才是样本序列上的一一对应关系:第 pos 个输入字符 x 要预测右移一位的目标字符 y。
Dataset Tokens
251
Train Tokens
230
Vocab Size
39
Steps / Epoch
1
Context T
320
B*T / Step
10,240
Window Used
250
Val Tokens
21
vocab preview:
\n | space | ( | ) | * | , | - | / | 0 | 1 | 2 | 3 | 4 | 6 | 8 | 9 | < | = | > | [ | ] | a | c | d | e | f | i | k | l | m | n | o | p | r | s | t | u | v | x
selected sample token ids:
16, 35, 21, 34, 27, 18, 0, 22, 21, 28, 22, 36, 28, 21, 35, 24, 1, 32, 31, 32, 36, 28, 21, 35, 26, 31, 30, 1, 37, 21, 33, 26, 21, 30, 22, 24, 1, 31, 25, 1, 19, 9, 9, 10, 5, 1, 11, 12, 5, 1, 13, 12, 5, 1, 14, 8, 5, 1, 9, 15, 5, 1, 10, 5, 1, 11, 9, 20, 0, 16, 7, 35
x/y 右移窗口,模型在每一行用 x 预测 y
pos
x char
x id
y char
y id
0
<
16
t
35
1
t
35
a
21
2
a
21
s
34
3
s
34
k
27
4
k
27
>
18
5
>
18
\n
0
6
\n
0
c
22
7
c
22
a
21
8
a
21
l
28
9
l
28
c
22
10
c
22
u
36
11
u
36
l
28
12
l
28
a
21
13
a
21
t
35
14
t
35
e
24
15
e
24
space
1
16
space
1
p
32
17
p
32
o
31
18
o
31
p
32
19
p
32
u
36
20
u
36
l
28
21
l
28
a
21
22
a
21
t
35
23
t
35
i
26
24
i
26
o
31
25
o
31
n
30
26
n
30
space
1
27
space
1
v
37
28
v
37
a
21
29
a
21
r
33
30
r
33
i
26
31
i
26
a
21
32
a
21
n
30
33
n
30
c
22
34
c
22
e
24
35
e
24
space
1
steps_per_epoch = ceil(train_tokens / (batch_size * block_size))
                = ceil(230 / 10240)
                = 1

loss = mean cross_entropy(logits.reshape(-1, vocab), y.reshape(-1))
5. TinyGPT 训练核心:这条样本进入 forward/backward 后发生什么

如果这条样本窗口被 make_batch 抽中,它会和其他随机窗口一起组成真实训练 batch。这里继续追踪确定性的张量形状和计算关系;真实 loss 数值取决于当前模型权重,必须由本地 runner 的 PyTorch forward 才能算出。

x / y shape
[32, 320]
Sample Tokens
251
Sample x/y Pairs
250
Context Filled
78.4%
Batch Windows
32
Loss Terms / Step
10,240
Embedding d
192
Layers
4
Heads
4
Head dim
48
Vocab V
39
QKV Scalars / Window
184,320
Attn Scores / Head
102,400
Causal Visible
50.2%
Logit Scores / Step
399,360
Random Loss ≈ ln(V)
3.664
Random PPL ≈ V
39
Steps/Epoch
1
Planned Steps
10
样本窗口具体含义
当前样本被格式化后有 251 个字符 token, 因而最多产生 250x[t] -> y[t] 监督对。当前 context 是 320, 所以这条样本单独占满约 78.4% 的上下文。
一次 optimizer.step 更新什么
一个 batch 有 32 个随机窗口,每个窗口 320 个位置,所以每次更新平均 10,240 个 next-token 判断。不是“这一条 JSONL 更新一次”,而是很多窗口共同投票。
随机基线如何理解
当前词表大小 V=39。如果模型完全随机平均猜字符, cross entropy 约为 ln(V)=3.664, perplexity 约等于 39。训练 loss 低于这个值,才说明模型学到了结构。
forward 关键张量的具体形状
x, y
[32, 320]
整数 token id;x 是输入,y 是右移一位的答案
token_emb(x)
[32, 320, 192]
每个字符 id 查表,变成 d 维向量
pos_emb
[320, 192]
给第 0..T-1 个位置加位置信息
h0
[32, 320, 192]
内容向量 + 位置向量
Q/K/V
[32, 4, 320, 48]
每个 head 各自生成查询、键、值
attention scores
[32, 4, 320, 320]
每个位置对所有历史位置打分
causal visible pairs
51,360
每个 head 只允许看左侧和当前位置,共 50.2%
logits
[32, 320, 39]
每个位置输出 V 个候选字符分数
loss input
[10,240, 39]
把 B*T 个位置摊平后做交叉熵
从这条样本到 loss 的具体路径
当前样本:
formatted_tokens = 251
supervised_pairs = formatted_tokens - 1 = 250

当前训练设置:
B = 32
T = 320
V = 39
d = 192
heads = 4
head_dim = 48

一次 step:
loss_terms = B * T = 32 * 320 = 10240
logit_scores = B * T * V = 32 * 320 * 39 = 399360

随机猜测基线:
loss_random = ln(V) = ln(39) = 3.663562
perplexity_random = exp(loss_random) ≈ 39
# make_batch 抽中某个窗口时,训练监督是右移一位
x = ids[i : i + 320]          # shape [32, 320]
y = ids[i + 1 : i + 321]  # shape [32, 320]

# TinyCodeGPT.forward(idx=x, targets=y)
h0 = token_emb(x) + pos_emb(arange(320))       # [32, 320, 192]
q, k, v = Linear(h).split(192)       # each [32, 320, 192]
attention = softmax(q @ k.T / sqrt(head_dim) + causal_mask)
h = TransformerBlock(... repeated 4 layers ...)
logits = lm_head(layer_norm(h))              # [32, 320, 39]
loss = cross_entropy(logits.reshape(10240, 39), y.reshape(10240))
# backward 阶段不是“单独更新这一条 JSONL”
# 而是用当前 batch 的 B*T 个 next-token 判断共同更新参数。
optimizer.zero_grad(set_to_none=True)
loss.backward()
clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()

# 梯度路径
loss
  -> logits / lm_head
  -> final layer norm
  -> Transformer blocks: attention + FFN
  -> token_emb / pos_emb

# 真实 loss 数值需要当前权重:
# 初始化模型、继续训练 checkpoint、学习率、历史 step 都会影响它。
和单条样本的关系:当前面板展示的是这条样本的第一个可见窗口。真实训练时,runner 会从整个训练文本随机抽取很多窗口; 这条样本可能在不同 step、不同 offset 被多次抽中。模型学到的不是“第 3 行 JSONL 的答案”,而是所有窗口共同塑造的p(next token | left context)
6. 执行样本代码,验证它真正计算了什么

训练 loss 只看字符概率;执行结果才告诉我们样本代码在 Python 里输出了什么,或生成了哪些图表 artifact。

点击“执行样本代码”后,这里会显示真实 stdout、错误信息或 Matplotlib 图片。对于可解释样本,stdout 应与上面的公式计算结果一致。
7. LLM 深度解释当前样本(可选)

规则解释器负责确定性计算,LLM 只在你点击时分析当前这一条样本。适合未识别、基础预览,或想把代码语义和训练流程讲得更深入的情况。

当前没有可用的个人 LLM API 配置。请先登录并在个人 LLM API 配置中启用 DeepSeek、Kimi 或火山方舟;不配置也不影响上面的确定性追踪和本地执行。
如果未连接,请在项目根目录启动:
.venv\Scripts\python.exe scripts\tinycodegpt_local_runner.py
Target Epochs 大于 0 时,runner 会按数据量估算 steps/epoch,并训练到目标 epoch;Fallback Steps 只在 Target Epochs 设为 0 时作为固定步数使用。
Status
-
Dataset
-
Source
template
Model
4L/4H/192D/320ctx
Planned Steps
50,000
Epoch
10.00
Params
-
Latest Loss
-
训练日志
启动训练后会显示 step、loss、tokens/sec、checkpoint 路径。
调用训练后模型
Selected
Latest checkpoint
训练完成后在这里查看模型生成的 Python 代码。
12

第五步:不要只看 loss,要看代码是否能运行

loss 下降只能说明模型越来越会预测训练文本里的下一个 token。对 task→code 模型来说,更重要的是生成代码能不能真正运行。

指标解释
Train Loss训练集 next-token cross entropy
Val Loss验证集 next-token cross entropy,用来观察过拟合
Syntax Pass Rate生成代码能否通过 Python 解析
Execution Pass Rate生成代码能否本地执行不报错
Category Pass RateMath、Python、NumPy、Pandas、Matplotlib、ML 各自表现

对初学者来说,最重要的复盘问题是:模型失败时,是数据类别不足、代码风格不统一、训练步数不够,还是模型太小?这比单纯追求更低 loss 更接近真实 LLM 工程。

训练成功的最低标准: 1. loss 下降 2. 验证 loss 不明显崩坏 3. 生成代码结构完整 4. 至少一部分任务可以本地运行通过
命令

本地启动命令

powershell
# 在 AlgoLab 项目根目录运行
.venv\Scripts\python.exe scripts\tinycodegpt_local_runner.py

# 默认服务地址
# http://127.0.0.1:4877/status

# 训练产生的 checkpoint 会保存在
# .tmp/tinycodegpt/checkpoints
源码

tinycodegpt_local_runner.py 完整代码(可复制)

python
from __future__ import annotations

import argparse
import base64
import json
import math
import os
import random
import subprocess
import sys
import tempfile
import threading
import time
import traceback
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.parse import urlparse

try:
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
except Exception:  # pragma: no cover - status endpoint reports this clearly.
    torch = None
    class _NNStub:
        Module = object

    nn = _NNStub()
    F = None


def torch_no_grad():
    if torch is not None:
        return torch.no_grad()

    def decorator(function):
        return function

    return decorator


ROOT_DIR = Path(__file__).resolve().parents[1]
RUN_DIR = ROOT_DIR / ".tmp" / "tinycodegpt"
CHECKPOINT_DIR = RUN_DIR / "checkpoints"
RUN_OUTPUT_DIR = RUN_DIR / "generated_runs"
DATASET_DIR = RUN_DIR / "datasets"
RUN_DIR.mkdir(parents=True, exist_ok=True)
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
RUN_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
DATASET_DIR.mkdir(parents=True, exist_ok=True)

JOBS: dict[str, dict[str, Any]] = {}
STOP_EVENTS: dict[str, threading.Event] = {}
ACTIVE_JOB_ID: str | None = None
LOCK = threading.Lock()


CATEGORIES = ("Math", "Python", "NumPy", "Pandas", "Matplotlib", "ML")
FORBIDDEN_CODE_PATTERNS = (
    "subprocess",
    "os.system",
    "socket",
    "requests",
    "urllib",
    "shutil",
    "pickle",
    "eval(",
    "exec(",
    "open(",
    "Path(",
)


@dataclass
class ModelConfig:
    n_layer: int
    n_head: int
    n_embd: int
    block_size: int


PRESETS = {
    "tiny": ModelConfig(n_layer=2, n_head=2, n_embd=96, block_size=192),
    "small": ModelConfig(n_layer=4, n_head=4, n_embd=192, block_size=320),
    "medium": ModelConfig(n_layer=6, n_head=6, n_embd=384, block_size=512),
}


def clamp_int(value: Any, fallback: int, lower: int, upper: int) -> int:
    try:
        parsed = int(value)
    except (TypeError, ValueError):
        parsed = fallback
    return max(lower, min(parsed, upper))


def clamp_float(value: Any, fallback: float, lower: float, upper: float) -> float:
    try:
        parsed = float(value)
    except (TypeError, ValueError):
        parsed = fallback
    return max(lower, min(parsed, upper))


def resolve_model_config(request: dict[str, Any]) -> tuple[str, ModelConfig]:
    preset = str(request.get("preset") or "small")
    base_config = PRESETS.get(preset, PRESETS["small"])
    raw_config = request.get("model_config") if preset == "custom" else None
    if not isinstance(raw_config, dict):
        return preset if preset in PRESETS else "small", base_config

    config = ModelConfig(
        n_layer=clamp_int(raw_config.get("n_layer"), base_config.n_layer, 1, 12),
        n_head=clamp_int(raw_config.get("n_head"), base_config.n_head, 1, 12),
        n_embd=clamp_int(raw_config.get("n_embd"), base_config.n_embd, 32, 768),
        block_size=clamp_int(raw_config.get("block_size"), base_config.block_size, 64, 1024),
    )
    if config.n_embd % config.n_head != 0:
        raise ValueError("n_embd must be divisible by n_head for custom model config.")
    return "custom", config


def utc_now() -> str:
    return datetime.now(timezone.utc).isoformat()


def jsonable(value: Any) -> Any:
    if isinstance(value, Path):
        return str(value)
    return value


def respond(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any]) -> None:
    raw = json.dumps(payload, ensure_ascii=False, default=jsonable).encode("utf-8")
    handler.send_response(status)
    handler.send_header("Content-Type", "application/json; charset=utf-8")
    handler.send_header("Content-Length", str(len(raw)))
    handler.send_header("Access-Control-Allow-Origin", "*")
    handler.send_header("Access-Control-Allow-Headers", "Content-Type")
    handler.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
    handler.end_headers()
    handler.wfile.write(raw)


def read_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]:
    length = int(handler.headers.get("Content-Length") or "0")
    if length <= 0:
        return {}
    raw = handler.rfile.read(length).decode("utf-8")
    return json.loads(raw or "{}")


def update_job(job_id: str, **patch: Any) -> None:
    with LOCK:
        job = JOBS[job_id]
        job.update(patch)


def append_log(job_id: str, line: str) -> None:
    with LOCK:
        job = JOBS[job_id]
        logs = job.setdefault("logs", [])
        logs.append(line)
        if len(logs) > 240:
            del logs[: len(logs) - 240]


def append_metric(job_id: str, metric: dict[str, Any]) -> None:
    with LOCK:
        job = JOBS[job_id]
        metrics = job.setdefault("metrics", [])
        metrics.append(metric)
        if len(metrics) > 240:
            del metrics[: len(metrics) - 240]


def add_sample(samples: list[dict[str, str]], category: str, task: str, code: str) -> None:
    samples.append({"category": category, "task": task.strip(), "code": code.strip()})


def generate_dataset(samples_per_category: int, seed: int = 7) -> list[dict[str, str]]:
    rng = random.Random(seed)
    samples: list[dict[str, str]] = []

    for _ in range(samples_per_category):
        r = rng.randint(1, 80)
        add_sample(
            samples,
            "Math",
            f"calculate the area of a circle with radius {r}",
            f"import math\nr = {r}\narea = math.pi * r ** 2\nprint(area)",
        )
        w, h = rng.randint(2, 60), rng.randint(2, 60)
        add_sample(
            samples,
            "Math",
            f"calculate rectangle area with width {w} and height {h}",
            f"width = {w}\nheight = {h}\narea = width * height\nprint(area)",
        )
        numbers = [rng.randint(-20, 80) for _ in range(rng.randint(4, 8))]
        add_sample(
            samples,
            "Math",
            f"calculate mean and variance of {numbers}",
            "nums = "
            + repr(numbers)
            + "\nmean = sum(nums) / len(nums)\nvariance = sum((x - mean) ** 2 for x in nums) / len(nums)\nprint(mean, variance)",
        )
        x1, y1, x2, y2 = [rng.randint(-20, 20) for _ in range(4)]
        add_sample(
            samples,
            "Math",
            f"calculate distance between ({x1}, {y1}) and ({x2}, {y2})",
            f"import math\nx1, y1 = {x1}, {y1}\nx2, y2 = {x2}, {y2}\ndistance = math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)\nprint(distance)",
        )

        py_nums = [rng.randint(-30, 90) for _ in range(rng.randint(5, 10))]
        add_sample(
            samples,
            "Python",
            f"sort the list {py_nums} in ascending order",
            f"nums = {py_nums}\nsorted_nums = sorted(nums)\nprint(sorted_nums)",
        )
        threshold = rng.randint(0, 40)
        add_sample(
            samples,
            "Python",
            f"filter numbers greater than {threshold} from {py_nums}",
            f"nums = {py_nums}\nfiltered = [x for x in nums if x > {threshold}]\nprint(filtered)",
        )
        words = rng.choices(["alpha", "beta", "gamma", "delta", "theta", "lambda"], k=8)
        add_sample(
            samples,
            "Python",
            f"count word frequencies in {' '.join(words)}",
            "text = "
            + repr(" ".join(words))
            + "\ncounts = {}\nfor word in text.split():\n    counts[word] = counts.get(word, 0) + 1\nprint(counts)",
        )

        arr = [rng.randint(1, 50) for _ in range(5)]
        add_sample(
            samples,
            "NumPy",
            f"normalize numpy array {arr} to range 0 to 1",
            "import numpy as np\narr = np.array("
            + repr(arr)
            + ", dtype=float)\nnormalized = (arr - arr.min()) / (arr.max() - arr.min())\nprint(normalized)",
        )
        a = [[rng.randint(1, 5), rng.randint(1, 5)], [rng.randint(1, 5), rng.randint(1, 5)]]
        b = [[rng.randint(1, 5), rng.randint(1, 5)], [rng.randint(1, 5), rng.randint(1, 5)]]
        add_sample(
            samples,
            "NumPy",
            f"multiply two numpy matrices {a} and {b}",
            f"import numpy as np\nA = np.array({a})\nB = np.array({b})\nC = A @ B\nprint(C)",
        )
        start, stop = rng.randint(-10, 0), rng.randint(5, 20)
        add_sample(
            samples,
            "NumPy",
            f"create {8} evenly spaced numpy values from {start} to {stop}",
            f"import numpy as np\nx = np.linspace({start}, {stop}, 8)\nprint(x)",
        )

        prices = [rng.randint(10, 100) for _ in range(6)]
        cities = rng.choices(["Beijing", "Shanghai", "Chengdu"], k=6)
        add_sample(
            samples,
            "Pandas",
            "calculate mean price by city in a pandas dataframe",
            "import pandas as pd\n"
            + f"df = pd.DataFrame({{'city': {cities}, 'price': {prices}}})\n"
            + "result = df.groupby('city')['price'].mean()\nprint(result)",
        )
        add_sample(
            samples,
            "Pandas",
            f"filter pandas rows where price is greater than {threshold}",
            "import pandas as pd\n"
            + f"df = pd.DataFrame({{'city': {cities}, 'price': {prices}}})\n"
            + f"filtered = df[df['price'] > {threshold}]\nprint(filtered)",
        )

        func = rng.choice(["x ** 2", "np.sin(x)", "np.cos(x)", "x ** 3"])
        add_sample(
            samples,
            "Matplotlib",
            f"plot y = {func} from -10 to 10",
            "import numpy as np\nimport matplotlib.pyplot as plt\nx = np.linspace(-10, 10, 100)\ny = "
            + func
            + "\nplt.plot(x, y)\nplt.title('function plot')\nplt.show()",
        )
        add_sample(
            samples,
            "Matplotlib",
            "draw a scatter plot for two small numeric lists",
            "import matplotlib.pyplot as plt\nx = [1, 2, 3, 4, 5]\ny = [2, 5, 4, 8, 7]\nplt.scatter(x, y)\nplt.xlabel('x')\nplt.ylabel('y')\nplt.show()",
        )

        slope = rng.uniform(0.5, 4.0)
        intercept = rng.uniform(-3.0, 3.0)
        xs = list(range(-5, 6))
        ys = [round(slope * x + intercept, 3) for x in xs]
        add_sample(
            samples,
            "ML",
            "fit linear regression with numpy least squares",
            "import numpy as np\n"
            + f"x = np.array({xs}, dtype=float)\ny = np.array({ys}, dtype=float)\n"
            + "X = np.c_[x, np.ones_like(x)]\ncoef, bias = np.linalg.lstsq(X, y, rcond=None)[0]\nprint(coef, bias)",
        )
        add_sample(
            samples,
            "ML",
            "run gradient descent for y = 2x + 1",
            "import numpy as np\nx = np.array([0, 1, 2, 3], dtype=float)\ny = 2 * x + 1\nw, b = 0.0, 0.0\nlr = 0.05\nfor step in range(200):\n    pred = w * x + b\n    loss = np.mean((pred - y) ** 2)\n    dw = np.mean(2 * (pred - y) * x)\n    db = np.mean(2 * (pred - y))\n    w -= lr * dw\n    b -= lr * db\nprint(round(w, 3), round(b, 3), round(loss, 6))",
        )

    rng.shuffle(samples)
    return samples


def strip_jsonl_fences(raw: str) -> str:
    text = raw.strip()
    if text.startswith("```"):
        lines = text.splitlines()
        if lines and lines[0].lstrip().startswith("```"):
            lines = lines[1:]
        if lines and lines[-1].strip() == "```":
            lines = lines[:-1]
        text = "\n".join(lines).strip()
    return text


def assert_safe_sample_code(code: str, line_number: int) -> None:
    lowered = code.lower()
    for pattern in FORBIDDEN_CODE_PATTERNS:
        if pattern.lower() in lowered:
            raise ValueError(f"line {line_number}: code contains forbidden operation {pattern!r}")


def normalize_dataset_sample(value: Any, line_number: int) -> dict[str, str]:
    if not isinstance(value, dict):
        raise ValueError(f"line {line_number}: expected a JSON object")
    category = str(value.get("category") or "").strip()
    task = str(value.get("task") or "").strip()
    code = str(value.get("code") or "").strip()
    if category not in CATEGORIES:
        raise ValueError(f"line {line_number}: category must be one of {', '.join(CATEGORIES)}")
    if not task:
        raise ValueError(f"line {line_number}: task is required")
    if not code:
        raise ValueError(f"line {line_number}: code is required")
    assert_safe_sample_code(code, line_number)
    return {"category": category, "task": task, "code": code}


def parse_jsonl_dataset(raw: str) -> list[dict[str, str]]:
    text = strip_jsonl_fences(raw)
    if not text:
        raise ValueError("dataset_jsonl is empty")
    if text.startswith("["):
        parsed = json.loads(text)
        if not isinstance(parsed, list):
            raise ValueError("dataset JSON array is invalid")
        samples = [normalize_dataset_sample(item, index + 1) for index, item in enumerate(parsed)]
    else:
        samples = []
        for index, line in enumerate(text.splitlines(), start=1):
            stripped = line.strip()
            if not stripped:
                continue
            samples.append(normalize_dataset_sample(json.loads(stripped), index))
    if not samples:
        raise ValueError("dataset_jsonl does not contain any samples")
    return samples


def save_jsonl_dataset(job_id: str, samples: list[dict[str, str]]) -> Path:
    dataset_path = DATASET_DIR / f"{job_id}.jsonl"
    payload = "\n".join(json.dumps(sample, ensure_ascii=False) for sample in samples) + "\n"
    dataset_path.write_text(payload, encoding="utf-8")
    return dataset_path


def load_training_samples(
    job_id: str,
    request: dict[str, Any],
    samples_per_category: int,
    seed: int,
) -> tuple[list[dict[str, str]], str, Path | None]:
    raw_jsonl = str(request.get("dataset_jsonl") or "").strip()
    if raw_jsonl:
        samples = parse_jsonl_dataset(raw_jsonl)
        dataset_path = save_jsonl_dataset(job_id, samples)
        return samples, "teacher_jsonl", dataset_path

    return generate_dataset(samples_per_category, seed=seed), "template_synthetic", None


def format_sample(sample: dict[str, str]) -> str:
    return f"<task>\n{sample['task']}\n</task>\n<code>\n{sample['code']}\n</code>\n"


def build_vocab(text: str) -> tuple[dict[str, int], dict[int, str]]:
    chars = sorted(set(text))
    stoi = {ch: i for i, ch in enumerate(chars)}
    itos = {i: ch for ch, i in stoi.items()}
    return stoi, itos


def count_parameters(model: nn.Module) -> int:
    return sum(parameter.numel() for parameter in model.parameters())


def normalize_itos(raw_itos: dict[Any, str]) -> dict[int, str]:
    if not raw_itos:
        return {}
    return {int(key): value for key, value in raw_itos.items()} if isinstance(next(iter(raw_itos.keys())), str) else raw_itos


def load_resume_checkpoint(path_value: str | None) -> tuple[Path, dict[str, Any]]:
    checkpoint_path = Path(path_value) if path_value else latest_checkpoint()
    if checkpoint_path is None or not checkpoint_path.exists():
        raise RuntimeError("No checkpoint found to continue training from.")
    return checkpoint_path, torch.load(checkpoint_path, map_location="cpu")


class CausalSelfAttention(nn.Module):
    def __init__(self, config: ModelConfig):
        super().__init__()
        if config.n_embd % config.n_head != 0:
            raise ValueError("n_embd must be divisible by n_head")
        self.n_head = config.n_head
        self.head_dim = config.n_embd // config.n_head
        self.qkv = nn.Linear(config.n_embd, 3 * config.n_embd)
        self.proj = nn.Linear(config.n_embd, config.n_embd)
        self.register_buffer("mask", torch.tril(torch.ones(config.block_size, config.block_size)).view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        batch, tokens, channels = x.shape
        qkv = self.qkv(x)
        q, k, v = qkv.split(channels, dim=2)
        q = q.view(batch, tokens, self.n_head, self.head_dim).transpose(1, 2)
        k = k.view(batch, tokens, self.n_head, self.head_dim).transpose(1, 2)
        v = v.view(batch, tokens, self.n_head, self.head_dim).transpose(1, 2)
        attention = (q @ k.transpose(-2, -1)) / math.sqrt(self.head_dim)
        attention = attention.masked_fill(self.mask[:, :, :tokens, :tokens] == 0, float("-inf"))
        attention = F.softmax(attention, dim=-1)
        y = attention @ v
        y = y.transpose(1, 2).contiguous().view(batch, tokens, channels)
        return self.proj(y)


class TransformerBlock(nn.Module):
    def __init__(self, config: ModelConfig):
        super().__init__()
        self.ln1 = nn.LayerNorm(config.n_embd)
        self.attn = CausalSelfAttention(config)
        self.ln2 = nn.LayerNorm(config.n_embd)
        self.ffn = nn.Sequential(
            nn.Linear(config.n_embd, 4 * config.n_embd),
            nn.GELU(),
            nn.Linear(4 * config.n_embd, config.n_embd),
        )

    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.ffn(self.ln2(x))
        return x


class TinyCodeGPT(nn.Module):
    def __init__(self, vocab_size: int, config: ModelConfig):
        super().__init__()
        self.config = config
        self.token_emb = nn.Embedding(vocab_size, config.n_embd)
        self.pos_emb = nn.Embedding(config.block_size, config.n_embd)
        self.blocks = nn.Sequential(*[TransformerBlock(config) for _ in range(config.n_layer)])
        self.ln_f = nn.LayerNorm(config.n_embd)
        self.head = nn.Linear(config.n_embd, vocab_size, bias=False)

    def forward(self, idx, targets=None):
        _, tokens = idx.shape
        positions = torch.arange(tokens, device=idx.device)
        x = self.token_emb(idx) + self.pos_emb(positions)
        x = self.blocks(x)
        logits = self.head(self.ln_f(x))
        loss = None
        if targets is not None:
            loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)), targets.reshape(-1))
        return logits, loss

    @torch_no_grad()
    def generate(
        self,
        idx,
        max_new_tokens: int,
        temperature: float = 0.8,
        top_k: int = 30,
        stop_sequence: list[int] | None = None,
    ):
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -self.config.block_size :]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :] / max(temperature, 1e-4)
            if top_k:
                values, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < values[:, [-1]]] = -float("inf")
            probs = F.softmax(logits, dim=-1)
            next_id = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, next_id), dim=1)
            if stop_sequence and idx.size(1) >= len(stop_sequence):
                tail = idx[0, -len(stop_sequence) :].detach().cpu().tolist()
                if tail == stop_sequence:
                    break
        return idx


def make_batch(data: Any, block_size: int, batch_size: int, device: str):
    starts = torch.randint(0, len(data) - block_size - 1, (batch_size,))
    x = torch.stack([data[i : i + block_size] for i in starts]).to(device)
    y = torch.stack([data[i + 1 : i + block_size + 1] for i in starts]).to(device)
    return x, y


def evaluate(model: nn.Module, data: Any, block_size: int, batch_size: int, device: str) -> float:
    model.eval()
    losses = []
    with torch.no_grad():
        for _ in range(6):
            x, y = make_batch(data, block_size, batch_size, device)
            _, loss = model(x, y)
            losses.append(float(loss.item()))
    model.train()
    return sum(losses) / len(losses)


def run_training(job_id: str, request: dict[str, Any]) -> None:
    global ACTIVE_JOB_ID
    if torch is None:
        update_job(job_id, status="failed", error="PyTorch is not installed in this Python environment.", finished_at=utc_now())
        return

    stop_event = STOP_EVENTS[job_id]
    try:
        continue_from_checkpoint = bool(request.get("continue_from_checkpoint"))
        resume_checkpoint_path = str(request.get("checkpoint_path") or "").strip() or None
        preset, config = resolve_model_config(request)
        samples_per_category = max(10, min(int(request.get("samples_per_category") or 80), 600))
        fallback_max_steps = max(20, min(int(request.get("max_steps") or 50_000), 200_000))
        target_epochs = clamp_float(request.get("target_epochs"), 3.0, 0.0, 100.0)
        batch_size = max(4, min(int(request.get("batch_size") or 32), 128))
        learning_rate = float(request.get("learning_rate") or 8e-4)
        seed = int(request.get("seed") or 7)

        random.seed(seed)
        torch.manual_seed(seed)
        device = "cuda" if torch.cuda.is_available() else "cpu"
        update_job(job_id, status="running", started_at=utc_now(), device=device)
        append_log(job_id, f"device = {device}")
        append_log(job_id, f"preset = {preset}, config = {config}")

        samples, dataset_source, dataset_path = load_training_samples(job_id, request, samples_per_category, seed)
        text = "\n".join(format_sample(sample) for sample in samples)
        resume_checkpoint: dict[str, Any] | None = None
        resumed_from_checkpoint: Path | None = None
        if continue_from_checkpoint:
            resumed_from_checkpoint, resume_checkpoint = load_resume_checkpoint(resume_checkpoint_path)
            config = ModelConfig(**resume_checkpoint["config"])
            preset = str(resume_checkpoint.get("metadata", {}).get("preset") or "continued")
            stoi = resume_checkpoint["stoi"]
            itos = normalize_itos(resume_checkpoint["itos"])
            missing_chars = sorted(set(text) - set(stoi))
            if missing_chars:
                preview = "".join(missing_chars[:20])
                raise RuntimeError(
                    "Cannot continue from this checkpoint because the new dataset contains characters "
                    f"that are not in the checkpoint vocabulary: {preview!r}"
                )
            append_log(job_id, f"continue_from_checkpoint = {resumed_from_checkpoint}")
            append_log(job_id, f"checkpoint config = {config}")
        else:
            stoi, itos = build_vocab(text)
        ids = torch.tensor([stoi[ch] for ch in text], dtype=torch.long)
        if len(ids) <= config.block_size + batch_size + 2:
            raise RuntimeError(
                "Dataset is too small for the selected preset and batch size. "
                "Generate more JSONL samples or choose the tiny preset."
            )
        split = max(config.block_size + batch_size + 2, int(len(ids) * 0.92))
        split = min(split, len(ids) - config.block_size - batch_size - 2)
        train_ids = ids[:split]
        val_ids = ids[split:]
        if len(val_ids) <= config.block_size + batch_size + 2:
            val_ids = train_ids

        steps_per_epoch = max(1, math.ceil(len(train_ids) / (batch_size * config.block_size)))
        epoch_steps = math.ceil(steps_per_epoch * target_epochs) if target_epochs > 0 else fallback_max_steps
        max_steps = max(20, min(epoch_steps, 200_000))
        model = TinyCodeGPT(len(stoi), config).to(device)
        optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.1)
        if resume_checkpoint is not None:
            model.load_state_dict(resume_checkpoint["model_state_dict"])
            if resume_checkpoint.get("optimizer_state_dict"):
                optimizer.load_state_dict(resume_checkpoint["optimizer_state_dict"])
                for state in optimizer.state.values():
                    for key, value in state.items():
                        if hasattr(value, "to"):
                            state[key] = value.to(device)
        parameter_count = count_parameters(model)
        update_job(
            job_id,
            dataset_size=len(samples),
            dataset_source=dataset_source,
            dataset_path=str(dataset_path) if dataset_path else None,
            vocab_size=len(stoi),
            parameter_count=parameter_count,
            preset=preset,
            model_config=config.__dict__,
            max_steps=max_steps,
            fallback_max_steps=fallback_max_steps,
            target_epochs=target_epochs,
            steps_per_epoch=steps_per_epoch,
            resumed_from_checkpoint=str(resumed_from_checkpoint) if resumed_from_checkpoint else None,
        )
        append_log(job_id, f"dataset_size = {len(samples)} samples")
        append_log(job_id, f"dataset_source = {dataset_source}")
        if dataset_path:
            append_log(job_id, f"dataset saved: {dataset_path}")
        append_log(job_id, f"vocab_size = {len(stoi)} characters")
        append_log(job_id, f"parameters = {parameter_count:,}")
        append_log(
            job_id,
            f"target_epochs = {target_epochs:g}, steps_per_epoch ≈ {steps_per_epoch}, "
            f"planned_steps = {max_steps}, fallback_max_steps = {fallback_max_steps}",
        )

        log_every = max(10, max_steps // 18)
        start_time = time.time()
        for step in range(1, max_steps + 1):
            if stop_event.is_set():
                update_job(job_id, status="stopped", finished_at=utc_now())
                append_log(job_id, "training stopped by user")
                return
            x, y = make_batch(train_ids, config.block_size, batch_size, device)
            _, loss = model(x, y)
            optimizer.zero_grad(set_to_none=True)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

            if step == 1 or step % log_every == 0 or step == max_steps:
                val_loss = evaluate(model, val_ids, config.block_size, min(batch_size, 32), device)
                elapsed = max(time.time() - start_time, 1e-6)
                tokens_per_second = int(step * batch_size * config.block_size / elapsed)
                metric = {
                    "step": step,
                    "train_loss": round(float(loss.item()), 6),
                    "val_loss": round(float(val_loss), 6),
                    "tokens_per_second": tokens_per_second,
                    "epoch": round(step / steps_per_epoch, 4),
                }
                append_metric(job_id, metric)
                append_log(
                    job_id,
                    f"step {step:5d}/{max_steps} | epoch {metric['epoch']:.2f}/{target_epochs:g} | train_loss {metric['train_loss']:.4f} | val_loss {metric['val_loss']:.4f} | {tokens_per_second:,} tok/s",
                )

        checkpoint_path = CHECKPOINT_DIR / f"{job_id}.pt"
        torch.save(
            {
                "model_state_dict": model.state_dict(),
                "optimizer_state_dict": optimizer.state_dict(),
                "config": config.__dict__,
                "stoi": stoi,
                "itos": itos,
                "samples": samples[:120],
                "metadata": {
                    "preset": preset,
                    "model_config": config.__dict__,
                    "samples_per_category": samples_per_category,
                    "max_steps": max_steps,
                    "fallback_max_steps": fallback_max_steps,
                    "target_epochs": target_epochs,
                    "steps_per_epoch": steps_per_epoch,
                    "batch_size": batch_size,
                    "learning_rate": learning_rate,
                    "dataset_source": dataset_source,
                    "dataset_path": str(dataset_path) if dataset_path else None,
                    "dataset_size": len(samples),
                    "resumed_from_checkpoint": str(resumed_from_checkpoint) if resumed_from_checkpoint else None,
                    "created_at": utc_now(),
                },
            },
            checkpoint_path,
        )
        update_job(job_id, status="completed", checkpoint_path=str(checkpoint_path), finished_at=utc_now())
        append_log(job_id, f"checkpoint saved: {checkpoint_path}")
    except Exception as error:
        update_job(job_id, status="failed", error=str(error), finished_at=utc_now())
        append_log(job_id, traceback.format_exc())
    finally:
        with LOCK:
            if ACTIVE_JOB_ID == job_id:
                ACTIVE_JOB_ID = None


def latest_checkpoint() -> Path | None:
    checkpoints = sorted(CHECKPOINT_DIR.glob("*.pt"), key=lambda item: item.stat().st_mtime, reverse=True)
    return checkpoints[0] if checkpoints else None


def load_checkpoint(path_value: str | None):
    if torch is None:
        raise RuntimeError("PyTorch is not installed in this Python environment.")
    checkpoint_path = Path(path_value) if path_value else latest_checkpoint()
    if checkpoint_path is None or not checkpoint_path.exists():
        raise RuntimeError("No TinyCodeGPT checkpoint found. Train a model first.")
    checkpoint = torch.load(checkpoint_path, map_location="cpu")
    config = ModelConfig(**checkpoint["config"])
    stoi = checkpoint["stoi"]
    itos = normalize_itos(checkpoint["itos"])
    model = TinyCodeGPT(len(stoi), config)
    model.load_state_dict(checkpoint["model_state_dict"])
    model.eval()
    return checkpoint_path, model, config, stoi, itos


def extract_code(text: str) -> str:
    marker = "<code>\n"
    start = text.find(marker)
    if start >= 0:
        text = text[start + len(marker) :]
    end = text.find("</code>")
    if end >= 0:
        text = text[:end]
    return text.strip()


def encode_png_artifact(path: Path) -> dict[str, str]:
    payload = base64.b64encode(path.read_bytes()).decode("ascii")
    return {
        "filename": path.name,
        "path": str(path),
        "mime_type": "image/png",
        "data_url": f"data:image/png;base64,{payload}",
    }


def run_generated_code(code: str, timeout_seconds: int = 12) -> dict[str, Any]:
    if not code.strip():
        return {"output": [], "error": "Model returned empty code.", "artifacts": []}
    try:
        assert_safe_sample_code(code, 1)
    except ValueError as error:
        return {"output": [], "error": str(error), "artifacts": []}
    run_id = uuid.uuid4().hex[:10]
    script_path = RUN_OUTPUT_DIR / f"generated_{run_id}.py"
    capture_header = """
# AlgoLab runs matplotlib with a non-interactive backend and captures figures
# after the generated code finishes. Ignore the expected plt.show() warning.
import warnings as _algolab_warnings
_algolab_warnings.filterwarnings(
    "ignore",
    message="FigureCanvasAgg is non-interactive.*",
    category=UserWarning,
)
"""
    capture_footer = f"""

# AlgoLab captures matplotlib figures after generated code runs.
try:
    import os as _algolab_os
    import sys as _algolab_sys
    try:
        import matplotlib.pyplot as _algolab_plt
    except Exception:
        _algolab_plt = None
    _algolab_plot_dir = r"{RUN_OUTPUT_DIR}"
    if _algolab_plt is not None:
        for _algolab_index, _algolab_num in enumerate(_algolab_plt.get_fignums(), start=1):
            _algolab_figure = _algolab_plt.figure(_algolab_num)
            _algolab_path = _algolab_os.path.join(_algolab_plot_dir, "{run_id}_plot_" + str(_algolab_index) + ".png")
            _algolab_figure.savefig(_algolab_path, bbox_inches="tight")
except Exception as _algolab_plot_error:
    print("[AlgoLab plot capture failed] " + str(_algolab_plot_error), file=_algolab_sys.stderr)
"""
    script_path.write_text(f"{capture_header}\n{code.rstrip()}\n{capture_footer}", encoding="utf-8")
    env = {
        **os.environ,
        "PYTHONUTF8": "1",
        "PYTHONIOENCODING": "utf-8",
        "MPLBACKEND": "Agg",
    }
    try:
        completed = subprocess.run(
            [sys.executable, "-X", "utf8", str(script_path)],
            cwd=str(RUN_OUTPUT_DIR),
            env=env,
            text=True,
            capture_output=True,
            timeout=timeout_seconds,
            check=False,
        )
    except subprocess.TimeoutExpired:
        return {"output": [], "error": f"Generated code timed out after {timeout_seconds} seconds.", "artifacts": []}
    output = (completed.stdout or "").strip().splitlines()
    stderr_lines = (completed.stderr or "").strip().splitlines()
    filtered_stderr_lines: list[str] = []
    skip_show_line = False
    for line in stderr_lines:
        if "FigureCanvasAgg is non-interactive" in line:
            skip_show_line = True
            continue
        if skip_show_line and line.strip().startswith("plt.show("):
            skip_show_line = False
            continue
        skip_show_line = False
        filtered_stderr_lines.append(line)
    stderr = "\n".join(filtered_stderr_lines).strip()
    error = stderr or None
    if completed.returncode != 0 and not error:
        error = f"Generated code exited with status {completed.returncode}."
    artifacts = [encode_png_artifact(path) for path in sorted(RUN_OUTPUT_DIR.glob(f"{run_id}_plot_*.png"))]
    return {
        "output": output[-80:],
        "error": error,
        "artifacts": artifacts,
        "return_code": completed.returncode,
        "script_path": str(script_path),
    }


def generate_code(request: dict[str, Any]) -> dict[str, Any]:
    checkpoint_path, model, config, stoi, itos = load_checkpoint(request.get("checkpoint_path"))
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)
    task = str(request.get("task") or "").strip()
    if not task:
        raise RuntimeError("Task cannot be empty.")
    prompt = f"<task>\n{task}\n</task>\n<code>\n"
    encoded = [stoi[ch] for ch in prompt if ch in stoi]
    if not encoded:
        raise RuntimeError("Prompt contains no known characters from the tokenizer vocabulary.")
    idx = torch.tensor([encoded], dtype=torch.long, device=device)
    max_new_tokens = max(80, min(int(request.get("max_new_tokens") or 900), 2000))
    temperature = max(0.2, min(float(request.get("temperature") or 0.75), 1.5))
    stop_sequence = [stoi[ch] for ch in "\n</code>" if ch in stoi]
    with torch.no_grad():
        generated = model.generate(
            idx,
            max_new_tokens=max_new_tokens,
            temperature=temperature,
            stop_sequence=stop_sequence if stop_sequence else None,
        )
    text = "".join(itos[int(token)] for token in generated[0].detach().cpu().tolist())
    code = extract_code(text)
    return {
        "task": task,
        "code": code,
        "truncated": "</code>" not in text,
        "checkpoint_path": str(checkpoint_path),
    }


def execute_code(request: dict[str, Any]) -> dict[str, Any]:
    timeout_seconds = max(1, min(int(request.get("timeout_seconds") or 12), 30))
    code = str(request.get("code") or "").strip()
    result = run_generated_code(code, timeout_seconds=timeout_seconds)
    return {"code": code, **result}


def status_payload() -> dict[str, Any]:
    checkpoints = sorted(CHECKPOINT_DIR.glob("*.pt"), key=lambda item: item.stat().st_mtime, reverse=True)
    torch_available = torch is not None
    cuda_available = bool(torch_available and torch.cuda.is_available())
    return {
        "ok": True,
        "python": sys.version.split()[0],
        "torch_available": torch_available,
        "torch_version": getattr(torch, "__version__", None) if torch_available else None,
        "cuda_available": cuda_available,
        "cuda_device": torch.cuda.get_device_name(0) if cuda_available else None,
        "device": "cuda" if cuda_available else "cpu",
        "working_dir": str(RUN_DIR),
        "active_job_id": ACTIVE_JOB_ID,
        "checkpoints": [str(item) for item in checkpoints[:10]],
    }


class TinyCodeGPTHandler(BaseHTTPRequestHandler):
    def log_message(self, format: str, *args: Any) -> None:
        print(f"[tinycodegpt-runner] {self.address_string()} - {format % args}")

    def do_OPTIONS(self) -> None:
        respond(self, 200, {"ok": True})

    def do_GET(self) -> None:
        parsed = urlparse(self.path)
        path = parsed.path.rstrip("/") or "/"
        if path in {"/", "/status", "/health"}:
            respond(self, 200, status_payload())
            return
        if path.startswith("/jobs/"):
            job_id = path.split("/")[-1]
            with LOCK:
                job = JOBS.get(job_id)
            if not job:
                respond(self, 404, {"message": "Job not found."})
                return
            respond(self, 200, job)
            return
        respond(self, 404, {"message": "Unknown route."})

    def do_POST(self) -> None:
        global ACTIVE_JOB_ID
        parsed = urlparse(self.path)
        path = parsed.path.rstrip("/") or "/"
        try:
            if path == "/train":
                body = read_body(self)
                with LOCK:
                    if ACTIVE_JOB_ID and JOBS.get(ACTIVE_JOB_ID, {}).get("status") in {"queued", "running"}:
                        respond(self, 409, {"message": f"Training job already running: {ACTIVE_JOB_ID}"})
                        return
                    job_id = uuid.uuid4().hex[:12]
                    job = {
                        "id": job_id,
                        "status": "queued",
                        "created_at": utc_now(),
                        "logs": [],
                        "metrics": [],
                    }
                    JOBS[job_id] = job
                    STOP_EVENTS[job_id] = threading.Event()
                    ACTIVE_JOB_ID = job_id
                thread = threading.Thread(target=run_training, args=(job_id, body), daemon=True)
                thread.start()
                respond(self, 200, job)
                return

            if path.startswith("/jobs/") and path.endswith("/stop"):
                parts = path.split("/")
                job_id = parts[2] if len(parts) >= 3 else ""
                if job_id in STOP_EVENTS:
                    STOP_EVENTS[job_id].set()
                with LOCK:
                    job = JOBS.get(job_id)
                if not job:
                    respond(self, 404, {"message": "Job not found."})
                    return
                respond(self, 200, job)
                return

            if path == "/generate":
                body = read_body(self)
                result = generate_code(body)
                respond(self, 200, result)
                return

            if path == "/execute":
                body = read_body(self)
                result = execute_code(body)
                respond(self, 200, result)
                return

            respond(self, 404, {"message": "Unknown route."})
        except Exception as error:
            respond(self, 500, {"message": str(error), "traceback": traceback.format_exc()})


def main() -> None:
    parser = argparse.ArgumentParser(description="Local TinyCodeGPT training runner for AlgoLab.")
    parser.add_argument("--host", default="127.0.0.1")
    parser.add_argument("--port", type=int, default=4877)
    args = parser.parse_args()
    server = ThreadingHTTPServer((args.host, args.port), TinyCodeGPTHandler)
    print(f"TinyCodeGPT local runner listening on http://{args.host}:{args.port}")
    print(f"Working directory: {RUN_DIR}")
    print("Press Ctrl+C to stop.")
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        print("\nStopping TinyCodeGPT local runner.")
    finally:
        server.server_close()


if __name__ == "__main__":
    main()
深读 1

源码深读一:runner.py 不是一个脚本,而是一条本地训练流水线

前 6 个模块已经讲过 Transformer、token、attention 和训练的基本概念。这里换一个视角:把 tinycodegpt_local_runner.py 当作一条工程流水线看。用户在网页上点按钮后,真正的数据、状态和参数都在 runner 里流动。

源码区域它维护的状态它解决的问题容易误解的点
DATASET_DIR / CHECKPOINT_DIR / JOBS数据文件、checkpoint、训练任务状态让网页刷新后还能查询训练结果,并把模型文件留在本机网页不是训练主体;网页只是控制台
parse_jsonl_dataset统一的 {category, task, code} 样本把教师模型生成、内置抽样、手工粘贴都归一成同一训练格式JSONL 行数多不等于质量高;代码风格和类别覆盖也很关键
format_sample<task><code> 边界标记把监督学习任务固定成“看到任务,续写代码”这些边界符不是装饰,它们决定生成时从哪里开始停在哪里
build_vocab / encode字符词表与整数序列把 Python 文本变成 PyTorch 能训练的 LongTensor字符级模型可以跑通全流程,但复杂语义效率低于 BPE
TinyCodeGPT.forwardlogits 与 loss把每个位置的隐藏向量投影成下一个字符概率forward 里并不知道程序是否可运行
run_trainingdevice、steps、optimizer、metrics执行真实反向传播,持续写日志与 loss 曲线steps_per_epoch 是由 token 预算算出来的,不是用户随手填的 epoch 步数
generate_code / execute_code生成文本、执行输出、图表 artifact让模型质量从“像代码”进入“能运行、有结果”的验证闭环执行失败是训练反馈,不只是 UI 错误
Web UI 参数 -> POST /train -> 解析/保存 JSONL -> 拼接 task-code 文本 -> 字符词表 + token ids -> 随机窗口 batch -> forward / loss / backward / optimizer.step -> checkpoint -> generate_code -> execute_code + stdout / image artifacts
公式

源码深读二:一个训练 step 到底在估计什么

用户看到的 step/epoch 来自 token 预算,而不是 JSONL 行号。每个 step 随机抽取 B 个长度为 T 的窗口,y 是 x 右移一位。这样一次反向传播同时训练 B×T 个 next-character 判断。

全部 task-code 文本编码后的字符 token 数
encoded character-token count
理论上看完约一遍训练 token 所需 step 数
steps per approximate pass over training tokens
Target Epochs;大于 0 时决定总 step
target epochs
只有当 Target Epochs=0 时才直接使用的兜底 step 数
fallback steps used only when target epochs is zero

为什么 5000 条数据只有约 131 step/epoch

如果 5000 条 JSONL 拼成约 145 万字符 token,训练集约 134 万 token。以 batch_size=32、context=320 计算,每 step 消费 10240 个 token 位置,因此一轮约 1340000/10240≈131 step。这是正常的,不代表只训练了 131 条样本。

1,340,000 / (32 * 320) ≈ 131

Fallback Steps 为什么不再乘 Epoch

当前 runner 的语义是:Target Epochs 大于 0 时,总步数由数据规模和 epoch 推导;Target Epochs 等于 0 时,才把 Fallback Steps 当作手动训练预算。这样避免“epoch”和“max steps”两个上限互相打架。

想要更准,优先扩大哪几个量

对这个任务,优先级通常是:提高 JSONL 质量与覆盖面,增加模型容量,再增加训练轮数。单纯把 epoch 拉很高,会让模型更熟悉现有样本,但也更容易记住模板而不是学会泛化。

质量/覆盖 > 模型容量 > 合理 epoch > 盲目堆 step

源码深读三:训练预算图,哪些参数会放大计算量

这张图把数据规模、batch、context、epoch、模型容量和 checkpoint 质量放到同一张预算图里。可以把它理解为:左侧决定“模型看多少 token”,中间决定“每次更新多大”,右侧决定“模型有没有足够容量吸收这些模式”。
深读 4

源码深读四:生成、执行与续训,为什么它们是同一条质量闭环

训练完成后,runner 并没有结束。真正有用的本地模型,需要经历“加载 checkpoint → 生成代码 → 执行代码 → 观察错误 → 补数据或续训”的循环。这个循环比单独看 loss 更接近真实可用性。

阶段runner.py 中的关键函数输入输出它告诉我们什么
加载模型load_checkpointcheckpoint path / latest checkpointmodel、config、stoi、itos生成必须使用训练时同一套词表和结构,否则权重对不上
构造提示generate_code用户任务文本<task>...<code> 前缀提示格式要贴近训练样本格式,才能激活 task→code 能力
采样生成model.generatemax_new_tokens、temperature代码字符串Max New Tokens 太小会截断代码;temperature 太高会增加语法漂移
本地执行run_generated_code生成的 Pythonstdout、stderr、PNG artifact图表任务要看 artifact,数值任务要看 stdout,错误任务要看 traceback
继续训练continue_from_checkpoint旧 checkpoint + 新 JSONL更新后的 checkpoint当错误集中在某类任务时,补该类数据再续训比从头训练更高效

因此,课程里不应该只说“loss 降了”。更合理的判断是:验证 loss 下降、生成未明显截断、执行成功率提升、图表/数值结果符合任务。只有这四个信号同时改善,TinyCodeGPT 才从 toy demo 接近可用的小任务代码模型。

观察一次失败样例 1. 生成代码是否被 Max New Tokens 截断? 2. 错误是语法、依赖、API、数据形状,还是结果逻辑? 3. 失败类别是否集中在 Matplotlib / Pandas / ML? 4. 是否需要补同类 JSONL,再从当前 checkpoint 继续训练?
问问 LLM:把本地 TinyCodeGPT 训练闭环讲清楚