vllm的scheduler

vllm的scheduler

经过前面LLM启动流程源码阅读,知道了实际的token处理是在EngineCore的scheduler中被一个个调度的,今天来看一下Scheduler.schedule方法的具体逻辑。

Scheduler

process_engine_step()中,内部调用了scheduler的schedule()方法,schedule()中处理了一个个实际的request,在schedule中,优先处理处于running状态的请求(在online infer中,优先处理正在运行的请求,这样KV Cache也可以得到充分利用)。

在进入到schedule()方法之前,需要明确几个变量的含义:

1
2
3
4
5
6
7
8
9
num_tokens_with_spec -> 当前需要模型处理的总token
num_tokens -> 已确认的token数(prompt+投机采样已接受的token
num_computed_tokens -> 已经计算过的token
(例如输入prompt[1,11,3], decode阶段输出了三个[4,5,6],此时45为投机采样被接受的,6是本轮decode的输出,这里只计算已经有KV Cache的token数量)
request.num_output_placeholder -> 异步调度中的占位符 (在同步调度中这个值始终为0)
>在异步调度中,scheduler不会等待上一个请求结束就会调度下一个,这里是对前一个请求输出的占位符
>Q:如何确定前一个请求的输出大小呢?

num_tokens_with_spec = len(prompt_token_ids) + len(output_token_ids) + len(spec_token_ids)

下边是一段schedule()方法的源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def schedule(self) -> SchedulerOutput:
...一些变量准备
# First, schedule the RUNNING requests.
req_index = 0
while req_index < len(self.running) and token_budget > 0:
request = self.running[req_index]

if (
request.num_output_placeholders > 0
and request.num_computed_tokens + 2 - request.num_output_placeholders
>= request.num_prompt_tokens + request.max_tokens
):
req_index += 1
continue

num_new_tokens = (
request.num_tokens_with_spec
+ request.num_output_placeholders
- request.num_computed_tokens
)
if 0 < self.scheduler_config.long_prefill_token_threshold < num_new_tokens:
num_new_tokens = self.scheduler_config.long_prefill_token_threshold
num_new_tokens = min(num_new_tokens, token_budget)

# Make sure the input position does not exceed the max model len.
# This is necessary when using spec decoding.
num_new_tokens = min(
num_new_tokens, self.max_model_len - 1 - request.num_computed_tokens
)

# Schedule encoder inputs.
encoder_inputs_to_schedule = None
external_load_encoder_input: list[int] = []
new_encoder_compute_budget = encoder_compute_budget
if request.has_encoder_inputs:
(
encoder_inputs_to_schedule,
num_new_tokens,
new_encoder_compute_budget,
external_load_encoder_input,
) = self._try_schedule_encoder_inputs(
request,
request.num_computed_tokens,
num_new_tokens,
encoder_compute_budget,
shift_computed_tokens=1 if self.use_eagle else 0,
)

if self.need_mamba_block_aligned_split:
num_new_tokens = self._mamba_block_aligned_split(
request, num_new_tokens
)

if num_new_tokens == 0:
req_index += 1
continue

with record_function_or_nullcontext("schedule: allocate_slots"):
while True:
new_blocks = self.kv_cache_manager.allocate_slots(
request,
num_new_tokens,
num_lookahead_tokens=self.num_lookahead_tokens,
)

if new_blocks is not None:
# The request can be scheduled.
break

# The request cannot be scheduled.
# Preempt the lowest-priority request.
if self.policy == SchedulingPolicy.PRIORITY:
preempted_req = max(
self.running,
key=lambda r: (r.priority, r.arrival_time),
)
self.running.remove(preempted_req)
if preempted_req in scheduled_running_reqs:
preempted_req_id = preempted_req.request_id
scheduled_running_reqs.remove(preempted_req)
token_budget += num_scheduled_tokens.pop(preempted_req_id)
req_to_new_blocks.pop(preempted_req_id)
scheduled_spec_decode_tokens.pop(preempted_req_id, None)
preempted_encoder_inputs = scheduled_encoder_inputs.pop(
preempted_req_id, None
)
if preempted_encoder_inputs:
# Restore encoder compute budget if the preempted
# request had encoder inputs scheduled in this step.
num_embeds_to_restore = sum(
preempted_req.get_num_encoder_embeds(i)
for i in preempted_encoder_inputs
)
encoder_compute_budget += num_embeds_to_restore
req_index -= 1
else:
preempted_req = self.running.pop()

self._preempt_request(preempted_req, scheduled_timestamp)
preempted_reqs.append(preempted_req)
if preempted_req == request:
# No more request to preempt. Cannot schedule this request.
break

if new_blocks is None:
# Cannot schedule this request.
break

# Schedule the request.
scheduled_running_reqs.append(request)
request_id = request.request_id
req_to_new_blocks[request_id] = new_blocks
num_scheduled_tokens[request_id] = num_new_tokens
token_budget -= num_new_tokens
req_index += 1

# Speculative decode related.
if request.spec_token_ids:
num_scheduled_spec_tokens = (
num_new_tokens
+ request.num_computed_tokens
- request.num_tokens
- request.num_output_placeholders
)
if num_scheduled_spec_tokens > 0:
spec_token_ids = request.spec_token_ids
if len(spec_token_ids) > num_scheduled_spec_tokens:
spec_token_ids = spec_token_ids[:num_scheduled_spec_tokens]
scheduled_spec_decode_tokens[request.request_id] = spec_token_ids

# New spec tokens will be set in `update_draft_token_ids` before the
# next step when applicable.
request.spec_token_ids = []

# Encoder-related.
if encoder_inputs_to_schedule:
scheduled_encoder_inputs[request_id] = encoder_inputs_to_schedule
# Allocate the encoder cache.
for i in encoder_inputs_to_schedule:
self.encoder_cache_manager.allocate(request, i)
if self.ec_connector is not None:
self.ec_connector.update_state_after_alloc(request, i)
encoder_compute_budget = new_encoder_compute_budget
if external_load_encoder_input:
for i in external_load_encoder_input:
self.encoder_cache_manager.allocate(request, i)
if self.ec_connector is not None:
self.ec_connector.update_state_after_alloc(request, i)

在异步调度中,为了防止浪费GPU资源,采用的方式为GPU正在计算request_1同时不断接受request_2…,因此我们需要一个output_placeholder来预留一个还没有完成的request的输出,所以下边的这一段条件判断,指的是已经计算的token数 + output_placeholder >= 用户指定的最大token数

1
2
3
4
5
if (
request.num_output_placeholders > 0
and (num_computed_tokens + 1) - (num_output_placeholders - 1)
>= request.num_prompt_tokens + request.max_tokens
):
  1. 当前运行的任务中:在最坏情况下所有的draft_token都被拒绝,模型还是会至少生成一个有效token -> num_output_placeholders - 1
  2. 收到这一次request:在这一步假设再调度一次,至少会生成一个token -> num_computed_tokens + 1

紧接着,schedule会尝试为到来的请求分配KVCache,如果成功分配了,直接调度该请求,如果失败了那么会尝试踢出正在运行的请求,整个踢出的流程图大致如下:

接着scheduler会尝试调度waiting队列的请求(如果本轮没有发生抢占)。

如果发生了抢占,那么其实说明先前分配KV Cache的时候发现了空间不足的情况,因此这里不会进入到Waiting队列的调度。

本轮发生抢占
→ KV cache 已满
→ 调度 waiting 队列必然失败(allocate_slots 返回 None)
→ 直接跳过,等下一轮 KV cache 有空间时再调度

1
2
3
4
5
6
7
8
9
10
11
while (waiting 或 skipped_waiting 非空) and token_budget > 0:  
├── running 已满 → break
├── 选择队列(waiting 或 skipped_waiting)
├── 取队头请求
├── 检查各种约束(blocked状态、LoRA限制、KV cache等)
│ └── 不满足 → 跳过,放入 step_skipped_waiting
├── 计算 prefix cache 命中(本地 + 外部)
├── 计算 num_new_tokens(需要调度的 token 数)
├── 尝试分配 KV cache
│ └── 失败 → break(不再尝试后续请求)
└── 成功 → 加入 running 队列,更新状态

下一步,这里会尝试获取最长公共的prefix_cache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Get the longest common prefix among all requests in the running queue.
# This can be potentially used for cascade attention.
num_common_prefix_blocks = [0] * len(self.kv_cache_config.kv_cache_groups)
with record_function_or_nullcontext("schedule: get_num_common_prefix_blocks"):
if self.running:
any_request_id = self.running[0].request_id
num_common_prefix_blocks = (
self.kv_cache_manager.get_num_common_prefix_blocks(any_request_id)
)

# 这里是get_num_common_prefix_block(self, request_id)的定义,公共前缀与request_id是无关的
# 其中self.req_to_blocks 是一个request_id -> list[blocks]的哈希表
def get_num_common_prefix_blocks(self, running_request_id: str) -> int:
blocks = self.req_to_blocks[running_request_id] # 取某个请求的块列表
num_common_blocks = 0
for block in blocks:
if block.ref_cnt == len(self.req_to_blocks): # 所有请求都引用了这个块
num_common_blocks += 1
else:
break
return num_common_blocks

最后Scheduler会构造SchedulerOutput数据结构,传递给EngineCore的model_executor利用RPC交给实际的GPU Worker执行模型推理。

因为模型的forward是无记忆的,这里scheduler怎么获取到num_computed_tokens等这些有关request metadata的呢?

答案是在每一步的schedule调度后调用_update_after_schedule,scheduler会自己维护一个hash表,[req_id, …]来记录request的元数据,增量的保存这个Meta-data

Prefix_Cache 在多个GPU Worker执行推理时是这样运行(TP)

Issue: 2026/4/17

这个issue:https://github.com/vllm-project/vllm/issues/40004提出目前的scheduler只有在KV Cache Block无法分配的时候才会尝试从running队列中踢出优先级比较低的request,但是如果当前的running队列达到了max_seqs_len,高优先级的任务只能在waiting队列中等待。
不过已经有pr了:
https://github.com/vllm-project/vllm/pull/40087,后续看看这个会不会被合入

issue: #35780
移除当前vllm中PD分离模式下block_level的error_handling

1
2
3
4
5
6
def get_num_new_matched_tokens(  
self,
request: Request,
num_computed_tokens: int, # 本地 prefix cache 已命中的 token 数
) -> tuple[int | None, bool]:
# 返回:(额外可从外部加载的 token 数, 是否异步加载)

vllm的scheduler
http://example.com/2026/06/01/2_vllm的scheduler/
作者
Soya
发布于
2026年6月1日
许可协议