vllm的启动流程

vllm的启动流程

记录一下阅读源码时发现的问题

GPUModelRunner.py
“FIXME(woosuk): Fix warmup for LoRA.”
现在似乎这个model runner还没有对LoRA Adapter做Graph Capture

1. LLMEngine的初始化

从一个/root/autodl-tmp/vllm/examples/basic/offline_inference/basic.py为入口看一下vllm的启动/初始化流程:

1
2
3
4
5
6
def main():
# 初始化LLM,但未下载权重
llm = LLM(model="google/gemma-3-1b-it",enable_prefix_caching=True,max_model_len=8192)

outputs = llm.generate(prompts, sampling_params)
...

进入到LLM类中,构造函数中有一个有关键的llm_engine成员这个是我们模型推理的引擎部分,这里我们主要看一下from_engine_args()这个方法。

1
2
3
4
5
6
7
8
9
10
engine_args = EngineArgs(
...
)

log_non_default_args(engine_args)

self.llm_engine = LLMEngine.from_engine_args(
engine_args=engine_args, usage_context=UsageContext.LLM_CLASS
)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@classmethod
def from_engine_args(
cls,
engine_args: EngineArgs,
usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
stat_loggers: list[StatLoggerFactory] | None = None,
enable_multiprocessing: bool = False,
) -> "LLMEngine":
"""Creates an LLM engine from the engine arguments."""

# Create the engine configs.
vllm_config = engine_args.create_engine_config(usage_context)
executor_class = Executor.get_class(vllm_config)

if envs.VLLM_ENABLE_V1_MULTIPROCESSING:
logger.debug("Enabling multiprocessing for LLMEngine.")
enable_multiprocessing = True

# Create the LLMEngine.
return cls(
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=not engine_args.disable_log_stats,
usage_context=usage_context,
stat_loggers=stat_loggers,
multiprocess_mode=enable_multiprocessing,
)

llm_engine到这里就是一个大致的初始化,并没有看到加载模型权重等一系列方法,这是因为vllm是一个C-S架构的项目,从后续分析generate()这一方法的时候我们能够看到初始化计算后端等一系列逻辑,不过需要注意的是这里LLMEngine有一个executor_class,后续初始化执行器的时候会按照这个配置来初始化执行器。

2. generate()流程解析

2.1 MPClient (Multi-Processes Client)

前一节初始化LLM类后,计算backend其实还并没有初始化,实际调用generate()方法时vllm才会实现后端的初始化流程,下边是generate()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def generate(
self,
prompts: PromptType | Sequence[PromptType],
sampling_params: SamplingParams | Sequence[SamplingParams] | None = None,
*,
use_tqdm: bool | Callable[..., tqdm] = True,
lora_request: Sequence[LoRARequest] | LoRARequest | None = None,
priority: list[int] | None = None,
tokenization_kwargs: dict[str, Any] | None = None,
) -> list[RequestOutput]:

runner_type = self.model_config.runner_type
if runner_type != "generate":
raise ValueError(
"LLM.generate() is only supported for generative models. "
"Try passing `--runner generate` to use the model as a "
"generative model."
)

if sampling_params is None:
sampling_params = self.get_default_sampling_params()

return self._run_completion(
prompts=prompts,
params=sampling_params,
output_type=RequestOutput,
use_tqdm=use_tqdm,
lora_request=lora_request,
tokenization_kwargs=tokenization_kwargs,
priority=priority,
)

def _run_completion(
self,
prompts: PromptType | Sequence[PromptType],
params: SamplingParams
| PoolingParams
| Sequence[SamplingParams | PoolingParams],
output_type: type[_O],
*,
use_tqdm: bool | Callable[..., tqdm] = True,
lora_request: Sequence[LoRARequest] | LoRARequest | None = None,
priority: list[int] | None = None,
tokenization_kwargs: dict[str, Any] | None = None,
):
self._add_completion_requests(
prompts=prompts,
params=params,
use_tqdm=use_tqdm,
lora_request=lora_request,
priority=priority,
tokenization_kwargs=tokenization_kwargs,
)
return self._run_engine(use_tqdm=use_tqdm, output_type=output_type)

_run_completion()_add_completion_requests()做的是将prompts变化为Sequence数据结构再添加为reqeust,这里LLM持有一个reqeust id的列表。根据调用链一路向下,最后调用到了LLMEngine._add_requests()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def _add_request(
self,
prompt: EngineInput,
params: SamplingParams | PoolingParams,
lora_request: LoRARequest | None = None,
priority: int = 0,
) -> str:
if isinstance(params, SamplingParams):
# We only care about the final output
params.output_kind = RequestOutputKind.FINAL_ONLY

request_id = str(next(self.request_counter))

return self.llm_engine.add_request(
request_id,
prompt,
params,
lora_request=lora_request,
priority=priority,
)

def add_request(
self,
request_id: str,
prompt: EngineCoreRequest | PromptType | EngineInput,
params: SamplingParams | PoolingParams,
arrival_time: float | None = None,
lora_request: LoRARequest | None = None,
tokenization_kwargs: dict[str, Any] | None = None,
trace_headers: Mapping[str, str] | None = None,
priority: int = 0,
prompt_text: str | None = None,
) -> str:

if isinstance(prompt, EngineCoreRequest):
...
else:
request = self.input_processor.process_inputs(
request_id,
prompt,
params,
supported_tasks=self.get_supported_tasks(),
arrival_time=arrival_time,
lora_request=lora_request,
tokenization_kwargs=tokenization_kwargs,
trace_headers=trace_headers,
priority=priority,
)
prompt_text, _, _ = extract_prompt_components(self.model_config, prompt)

self.input_processor.assign_request_id(request)

req_id = request.request_id

# Use cloned params that may have been updated in process_inputs()
params = request.params

n = params.n if isinstance(params, SamplingParams) else 1

if n == 1:
# Make a new RequestState and queue.
self.output_processor.add_request(request, prompt_text, None, 0)
# Add the request to EngineCore.
self.engine_core.add_request(request)
return req_id

# Fan out child requests (for n>1).
parent_req = ParentRequest(request)
for idx in range(n):
request_id, child_params = parent_req.get_child_info(idx)
child_request = request if idx == n - 1 else copy(request)
child_request.request_id = request_id
child_request.sampling_params = child_params

# Make a new RequestState and queue.
self.output_processor.add_request(
child_request, prompt_text, parent_req, idx
)
# Add the request to EngineCore.
self.engine_core.add_request(child_request)

return req_id

这里并没有贴出完整的代码,我们只需要关注这里request首先被预处理,并在LLMEngine中被assign的一个id,返回给上层,接着由self.engine_core.add_request(request)继续向下传递。

EngineCoreClient的继承结构:

1
2
3
4
5
6
7
EngineCoreClient (ABC)  
├── InprocClient
└── MPClient
├── SyncMPClient
└── AsyncMPClient
└── DPAsyncMPClient
└── DPLBAsyncMPClient

InProcClent是最简易的版本,直接持有EngineCore对象;推理直接调用add_request并没有网络通信开销。

MPClient是多进程版本,通过zmq库以及socket实现通信,也是工业场景应用最多的版本,第一次阅读源码,应该重点关注这个类的实现类。

Async/Sync MPClient异步/同步版本,是MPCLient的实现类。

DPAsyncMPClient数据并行+外部负载均衡版本。

DPLBAsyncMPClient数据并行+内部负载均衡版本。

EngineCoreClient 是 vLLM v1 引擎架构中的通信抽象层,负责在上层引擎(LLMEngine / AsyncLLM)和底层 EngineCore(执行实际推理)之间传递请求和输出。它定义了统一的接口(add_requestget_outputabort_requests 等),屏蔽了不同部署模式下的通信细节。


make_client 工厂方法的选择逻辑

multiprocess_mode asyncio_mode data_parallel_size external_lb 选择的类
False False 任意 - InprocClient
True False 任意 - SyncMPClient
True True 1 - AsyncMPClient
True True >1 True DPAsyncMPClient
True True >1 False DPLBAsyncMPClient

2.2 launch_core_engines

MPClient的构造方法中,launch_core_engines初始化了真正的EngineCore,不过这里对Ray和其他的计算后端实现有一点点区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class MPCLient:
def __init__(
self,
asyncio_mode: bool,
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
client_addresses: dict[str, str] | None = None,
):
...zmq通信配置
with launch_core_engines(
vllm_config, executor_class, log_stats, addresses
) as (engine_manager, coordinator, addresses, tensor_queue):
self.resources.coordinator = coordinator
self.resources.engine_manager = engine_manager
...其他配置

@contextlib.contextmanager
def launch_core_engines(
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
addresses: EngineZmqAddresses,
num_api_servers: int = 1,
) -> Iterator[
tuple[
CoreEngineProcManager | CoreEngineActorManager | None,
DPCoordinator | None,
EngineZmqAddresses,
Queue | None,
]
]:
...初始化调度器

if parallel_config.data_parallel_backend == "ray":
logger.info("Starting ray-based data parallel backend")

engine_actor_manager = CoreEngineActorManager(
vllm_config=vllm_config,
addresses=addresses,
executor_class=executor_class,
log_stats=log_stats,
)

yield engine_actor_manager, coordinator, addresses, tensor_queue
return

if offline_mode:
assert local_engine_count == 1
engines_to_handshake = [CoreEngine(index=dp_rank, local=True)]
elif dp_rank == 0:
# Rank 0 holds Coordinator, so it handshakes with all Cores
# in both external dplb and internal dplb mode.
# Note this also covers the case where we have zero local engines
# and rank 0 is headless.
engines_to_handshake = [
CoreEngine(index=i, local=(i < local_engine_count)) for i in range(dp_size)
]
else:
# Rank > 0 handshakes with just the local cores it is managing.
assert local_engines_only, (
"Attempting to launch core_engines from dp_rank > 0, but "
"found internal DPLB, which is incompatible."
)
engines_to_handshake = [
CoreEngine(index=i, local=True)
for i in range(dp_rank, dp_rank + local_engine_count)
]

# Whether the started engines will handshake only with co-located
# front-end processes. In external_dp_lb mode, ranks > 0 handshake with
# their co-located frontend and also the rank 0 front-end, and hence this
# will be False.
handshake_local_only = offline_mode or local_engine_count == dp_size

# NOTE(yongji): handling scaling from intra-node to inter-node
if parallel_config.enable_elastic_ep:
handshake_local_only = False

handshake_address = get_engine_client_zmq_addr(
handshake_local_only, host, parallel_config.data_parallel_rpc_port
)

if local_engines_only and dp_rank > 0:
assert not handshake_local_only
local_handshake_address = get_open_zmq_ipc_path()
client_handshake_address = local_handshake_address
else:
local_handshake_address = handshake_address
client_handshake_address = None

with zmq_socket_ctx(
local_handshake_address, zmq.ROUTER, bind=True
) as handshake_socket:
# Start local engines.
if local_engine_count:
local_engine_manager = CoreEngineProcManager(
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=log_stats,
handshake_address=handshake_address,
client_handshake_address=client_handshake_address,
local_client=True,
local_engine_count=local_engine_count,
start_index=dp_rank,
local_start_index=local_start_index or 0,
tensor_queue=tensor_queue,
)
else:
local_engine_manager = None

yield local_engine_manager, coordinator, addresses, tensor_queue
# 等待engine启动

看到Ray的情况下代码中执行流会被提前返回给之前的with语句(大概是因为Ray拥有自己的计算调度方式)所以需要在最顶层进行分离。接着会保存所有需要握手的engine_core实例,并初始化本地的一系列的EngineCore,在CoreEngineProcManager内部其实就直接启动了一系列的本地进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
if local_engine_count:
local_engine_manager = CoreEngineProcManager(
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=log_stats,
handshake_address=handshake_address,
client_handshake_address=client_handshake_address,
local_client=True,
local_engine_count=local_engine_count,
start_index=dp_rank,
local_start_index=local_start_index or 0,
tensor_queue=tensor_queue,
)

2.3 Client

回到我们2.1节所说的Client中,以SyncMPClient为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def _send_input(self, request_type: EngineCoreRequestType, request: Any):
self.ensure_alive()
self.free_pending_messages()
# (Identity, RequestType, SerializedRequest)
msg = (self.core_engine, request_type.value, *self.encoder.encode(request))

if len(msg) <= 3:
# No auxiliary buffers => no tensor backing buffers in request.
self.input_socket.send_multipart(msg, copy=False)
return

tracker = self.input_socket.send_multipart(msg, copy=False, track=True)
self.add_pending_message(tracker, request)

def add_request(self, request: EngineCoreRequest) -> None:
if self.is_dp:
self.engines_running = True
self._send_input(EngineCoreRequestType.ADD, request)

这里做的就是直接将request加发送给后端的EngineCore,EngineCore再根据Scheduler分配具体的调度逻辑(Scheduler后面再解析),并附带上Tracker,那么这里队列中的请求是什么时候被处理的呢?我们需要回到之前的launch_core_engine中,这里有一个CoreEngineProcManager,我们进入查看他的构造函数,发现他在构造函数中启动了一系列的进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for index in range(local_engine_count):
local_index = local_start_index + index
global_index = start_index + index

# Start EngineCore in background process.
local_dp_ranks.append(local_index)
self.processes.append(
context.Process(
target=EngineCoreProc.run_engine_core,
name=f"EngineCore_DP{global_index}" if is_dp else "EngineCore",
kwargs=common_kwargs
| {"dp_rank": global_index, "local_dp_rank": local_index},
)
)

执行的目标方法是EngineCoreProc.run_engine_core,这里看到其中有一个run_busy_loop不难猜到这里就是从队列中获取request的循环了。

不过有意思的地方是这里做了退出信号的拦截,这里handle_signal()直接对本地的状态进行的更新,但是呢,如果只对本地状态做更新的话,engine在获取queue时其实是阻塞式的,如果队列为空,那么也检测不到状态更新会一直卡在busy_loop中,所以这里是直接向队列中发送了一个关闭的请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@staticmethod
def run_engine_core(*args, dp_rank: int = 0, local_dp_rank: int = 0, **kwargs):
...一系列检查与初始化等
def wakeup_engine():
# Wakes up idle engine via input_queue when shutdown is requested
# Not safe in a signal handler - we may interrupt the main thread
# while it is holding the non-reentrant input_queue.mutex
engine_core.input_queue.put_nowait((EngineCoreRequestType.WAKEUP, None))

signal_callback = SignalCallback(wakeup_engine)

def signal_handler(signum, frame):
engine_core.shutdown_state = EngineShutdownState.REQUESTED
signal_callback.trigger()

signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)

engine_core.run_busy_loop()
...后处理

在每一个Proc的run_busy_loop做了两件事,第一是处理来自Client的控制请求,第二是获取模型的输出。

1
2
3
4
5
6
7
def run_busy_loop(self):
"""Core busy loop of the EngineCore."""
while self._handle_shutdown():
# 1) Poll the input queue until there is work to do.
self._process_input_queue()
# 2) Step the engine core and return the outputs.
self._process_engine_step()

_process_input_queue代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def _process_input_queue(self):
"""Exits when an engine step needs to be performed."""

waited = False
while not self.has_work() and self.is_running():
# Notify callbacks waiting for engine to become idle.
self._notify_idle_state_callbacks()
if self.input_queue.empty():
# Drain aborts queue; all aborts are also processed via input_queue.
with self.aborts_queue.mutex:
self.aborts_queue.queue.clear()
if logger.isEnabledFor(DEBUG):
logger.debug("EngineCore waiting for work.")
waited = True
block = self.process_input_queue_block
try:
req = self.input_queue.get(block=block)
self._handle_client_request(*req)
except queue.Empty:
break
if not block:
break

if waited:
logger.debug("EngineCore loop active.")

# Handle any more client requests.
while not self.input_queue.empty():
req = self.input_queue.get_nowait()
self._handle_client_request(*req)

下边是一段解释

1
2
3
4
5
6
7
8
9
10
第一个 while 循环(阻塞等待):  
条件:not has_work() AND is_running()
→ 当调度器中没有任何请求时,阻塞等待 input_queue 有新请求进来
→ 每次取出一个请求,调用 _handle_client_request 处理
→ 处理完后重新检查 has_work(),如果有工作了就退出循环
第二个 while 循环(非阻塞排空):
条件:not input_queue.empty()
→ 已经有工作可做了,但队列里可能还有积压的请求
→ 非阻塞地把队列里剩余的请求全部处理完
→ 然后退出,让 _process_engine_step 执行推理

_process_engine_step是真正执行推理的步骤,大致流程如下,具体的我们后续再进行分析。

1
2
3
4
5
6
7
8
9
_process_engine_step()  
└─ step_fn() → EngineCore.step()
├─ 1. scheduler.schedule() → 生成 SchedulerOutput
├─ 2. model_executor.execute_model() → 提交 GPU forward pass(非阻塞)
├─ 3. scheduler.get_grammar_bitmask()→ 结构化输出的 grammar bitmask
├─ 4. future.result() → 阻塞等待 GPU 执行完成
└─ 5. scheduler.update_from_output()→ 处理模型输出,生成 EngineCoreOutputs
└─ output_queue.put_nowait(output) → 将结果放入输出队列
└─ post_step(model_executed) → 更新 speculative decoding draft tokens

DeepWiki生成的接口层到CoreEngine的代码关系。


vllm的启动流程
http://example.com/2026/06/01/1_vllm启动流程/
作者
Soya
发布于
2026年6月1日
许可协议