# Make sure the input position does not exceed the max model len. # This is necessary when using spec decoding. num_new_tokens = min( num_new_tokens, self.max_model_len - 1 - request.num_computed_tokens )
if new_blocks isnotNone: # The request can be scheduled. break
# The request cannot be scheduled. # Preempt the lowest-priority request. ifself.policy == SchedulingPolicy.PRIORITY: preempted_req = max( self.running, key=lambda r: (r.priority, r.arrival_time), ) self.running.remove(preempted_req) if preempted_req in scheduled_running_reqs: preempted_req_id = preempted_req.request_id scheduled_running_reqs.remove(preempted_req) token_budget += num_scheduled_tokens.pop(preempted_req_id) req_to_new_blocks.pop(preempted_req_id) scheduled_spec_decode_tokens.pop(preempted_req_id, None) preempted_encoder_inputs = scheduled_encoder_inputs.pop( preempted_req_id, None ) if preempted_encoder_inputs: # Restore encoder compute budget if the preempted # request had encoder inputs scheduled in this step. num_embeds_to_restore = sum( preempted_req.get_num_encoder_embeds(i) for i in preempted_encoder_inputs ) encoder_compute_budget += num_embeds_to_restore req_index -= 1 else: preempted_req = self.running.pop()
self._preempt_request(preempted_req, scheduled_timestamp) preempted_reqs.append(preempted_req) if preempted_req == request: # No more request to preempt. Cannot schedule this request. break
if new_blocks isNone: # Cannot schedule this request. break
# New spec tokens will be set in `update_draft_token_ids` before the # next step when applicable. request.spec_token_ids = []
# Encoder-related. if encoder_inputs_to_schedule: scheduled_encoder_inputs[request_id] = encoder_inputs_to_schedule # Allocate the encoder cache. for i in encoder_inputs_to_schedule: self.encoder_cache_manager.allocate(request, i) ifself.ec_connector isnotNone: self.ec_connector.update_state_after_alloc(request, i) encoder_compute_budget = new_encoder_compute_budget if external_load_encoder_input: for i in external_load_encoder_input: self.encoder_cache_manager.allocate(request, i) ifself.ec_connector isnotNone: self.ec_connector.update_state_after_alloc(request, i)
# Get the longest common prefix among all requests in the running queue. # This can be potentially used for cascade attention. num_common_prefix_blocks = [0] * len(self.kv_cache_config.kv_cache_groups) with record_function_or_nullcontext("schedule: get_num_common_prefix_blocks"): ifself.running: any_request_id = self.running[0].request_id num_common_prefix_blocks = ( self.kv_cache_manager.get_num_common_prefix_blocks(any_request_id) )