From 786daa52b51459a30b7e1e38759ef70ebcb771df Mon Sep 17 00:00:00 2001 From: kevin Date: Tue, 5 Aug 2025 14:09:03 +0800 Subject: [PATCH 1/7] optimize guided decoding performance --- .../xgrammar_apply_token_bitmask.py | 118 ++++++++++ .../guided_decoding/xgrammar_backend.py | 30 +-- .../model_executor/layers/sample/sampler.py | 216 +++++++++++------- 3 files changed, 268 insertions(+), 96 deletions(-) create mode 100644 fastdeploy/model_executor/guided_decoding/kernels/fastdeploy/model_executor/xgrammar_apply_token_bitmask.py diff --git a/fastdeploy/model_executor/guided_decoding/kernels/fastdeploy/model_executor/xgrammar_apply_token_bitmask.py b/fastdeploy/model_executor/guided_decoding/kernels/fastdeploy/model_executor/xgrammar_apply_token_bitmask.py new file mode 100644 index 0000000000..f0ba737a6e --- /dev/null +++ b/fastdeploy/model_executor/guided_decoding/kernels/fastdeploy/model_executor/xgrammar_apply_token_bitmask.py @@ -0,0 +1,118 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License" +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +# refer to https://github.com/mlc-ai/xgrammar/blob/main/python/xgrammar/kernels/apply_token_bitmask_inplace_triton.py + +from typing import List, Optional + +import paddle + +try: + import triton + import triton.language as tl +except ImportError as err: + raise ImportError("Triton is not installed") from err + + +@triton.jit +def apply_token_bitmask_inplace_kernel( + logits_ptr, + bitmask_ptr, + indices_ptr, + num_rows, + vocab_size, + logits_strides, + bitmask_strides, + NUM_SMS: tl.constexpr, + BLOCK_SIZE: tl.constexpr, +): + """Triton kernel for in-place logits masking using bitwise compression. + + Processes logits tensor in blocks, applying bitmask to restrict vocabulary access. + Masked positions are set to -inf to ensure zero probability during sampling. + + Note: + - Bitmask uses 32:1 compression (1 bit per vocabulary token) + - Optimized for GPU parallel processing with configurable block size + """ + pid = tl.program_id(0) + num_blocks = tl.cdiv(vocab_size, BLOCK_SIZE) + for work_id in tl.range(pid, num_rows * num_blocks, NUM_SMS): + row_id = work_id // num_blocks + block_offset = (work_id % num_blocks) * BLOCK_SIZE + batch_id = row_id if indices_ptr is None else tl.load(indices_ptr + row_id) + offsets = block_offset + tl.arange(0, BLOCK_SIZE) + bitmask_offsets = block_offset // 32 + tl.arange(0, BLOCK_SIZE // 32) + vocab_mask = offsets < vocab_size + packed_bitmask_mask = bitmask_offsets < bitmask_strides + packed_bitmask = tl.load(bitmask_ptr + batch_id * bitmask_strides + bitmask_offsets, packed_bitmask_mask) + bitmask = ((packed_bitmask[:, None] >> (tl.arange(0, 32)[None, :])) & 1) == 0 + bitmask = bitmask.reshape(BLOCK_SIZE) + + tl.store(logits_ptr + batch_id * logits_strides + offsets, -float("inf"), vocab_mask & bitmask) + + +def apply_token_bitmask_inplace_triton( + logits: paddle.Tensor, + bitmask: paddle.Tensor, + vocab_size: Optional[int] = None, + indices: Optional[List[int]] = None, +): + """Applies vocabulary mask to logits tensor using Triton GPU kernel. + + Args: + logits: Input logits tensor of shape [batch_size, vocab_size] + bitmask: Compressed mask tensor (int32) where each bit represents a token + vocab_size: Optional explicit vocabulary size (defaults to auto-detected) + indices: Optional list of batch indices to apply mask to + + Note: + Requires CUDA GPU with Triton support + Bitmask must be int32 tensor with shape [batch_size, ceil(vocab_size/32)] + """ + NUM_SMS = paddle.device.cuda.get_device_properties().multi_processor_count + BLOCK_SIZE = 4096 + + assert bitmask.dtype == paddle.int32, "bitmask must be of type int32" + + detected_vocab_size = min(logits.shape[-1], bitmask.shape[-1] * 32) + if vocab_size is None: + vocab_size = detected_vocab_size + else: + assert ( + vocab_size <= detected_vocab_size + ), f"vocab_size {vocab_size} is larger than the detected vocab_size {detected_vocab_size}" + + num_rows = len(indices) if indices is not None else logits.shape[0] if logits.ndim == 2 else 1 + + if indices is not None: + indices = paddle.to_tensor(indices, dtype=paddle.int32, place=logits.place) + + grid = (NUM_SMS,) + + apply_token_bitmask_inplace_kernel[grid]( + logits, + bitmask, + indices, + num_rows, + vocab_size, + logits.shape[-1], + bitmask.shape[-1], + NUM_SMS, + BLOCK_SIZE, + num_warps=BLOCK_SIZE // 32 // (16 // logits.element_size()), + num_stages=3, + ) diff --git a/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py b/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py index f702a1085e..0660713254 100644 --- a/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py +++ b/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py @@ -39,8 +39,9 @@ StructuralTagItem, TokenizerInfo, allocate_token_bitmask, - apply_token_bitmask_inplace, ) + + from .kernels.xgrammar_apply_token_bitmask import apply_token_bitmask_inplace_triton except Exception as e: raise Exception(f"import XGrammar failed, please check your environment:\n\t {e}") @@ -113,7 +114,7 @@ def fill_token_bitmask(self, token_bitmask: torch.Tensor, idx: int) -> None: def apply_token_mask( self, logits: paddle.Tensor, - token_bitmask: torch.Tensor, + token_bitmask: paddle.Tensor, indices: Optional[List[int]] = None, ) -> paddle.Tensor: """ @@ -121,28 +122,21 @@ def apply_token_mask( Args: logits (paddle.Tensor): The logits tensor to modify - token_bitmask (torch.Tensor): The token bitmask indicating allowed tokens + token_bitmask (paddle.Tensor): The token bitmask indicating allowed tokens indices (Optional[List[int]]): Optional list of batch indices to apply mask to Returns: paddle.Tensor: The modified logits tensor """ - origin_place = logits.place - origin_dtype = logits.dtype - logits = torch.from_numpy(logits.numpy()) - - logits = logits.float() # cpu - apply_token_bitmask_inplace( - logits=logits, - bitmask=token_bitmask.to(logits.device, non_blocking=True), - indices=indices, - ) + if token_bitmask.place != logits.place: + token_bitmask = token_bitmask.to(device=logits.place) - return paddle.to_tensor( - logits.numpy(), - dtype=origin_dtype, - place=origin_place, - ) + if logits.place.is_gpu_place(): + apply_token_bitmask_inplace_triton(logits, token_bitmask, self.vocab_size, indices) + else: + llm_logger.error(f"Unsupported device {logits.place}, skip guided decoding.") + + return logits def reset(self) -> None: """ diff --git a/fastdeploy/model_executor/layers/sample/sampler.py b/fastdeploy/model_executor/layers/sample/sampler.py index 412a7eda7f..23e6af8cf0 100644 --- a/fastdeploy/model_executor/layers/sample/sampler.py +++ b/fastdeploy/model_executor/layers/sample/sampler.py @@ -15,6 +15,7 @@ """ import threading +from abc import abstractmethod from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, List, Optional @@ -41,30 +42,46 @@ class SamplerProcessor: - """ - SamplingProcessor for guided decoding. + """Handles guided decoding with thread-safe logits processing and token masking. + + Manages asynchronous operations for efficient sampling with: + - Logits processors for constrained decoding + - Vocabulary masking + - Thread-safe state updates """ def __init__(self): self.async_step = None self.token_bitmask = None + self.insert_processor = False self.logits_processor: Dict[int, Optional[Any]] = dict() self.executor = ThreadPoolExecutor() self.logits_lock = threading.Lock() def add_logits_processor( - self, - ids: int, - future: Optional[Any] = None, - prefill_tokens: List[int] = [], + self, ids: int, future: Optional[Any] = None, prefill_tokens: List[int] = [], skip: bool = False ): - """add logits processor to SamplerProcessor""" + """Registers a logits processor for guided decoding. + + Args: + ids: Unique sequence identifier + future: Logits processor instance or Future containing one + prefill_tokens: Initial tokens to pre-load + skip: Whether to skip processor insertion + """ + if self.async_step is not None: + self.async_step.result() + self.async_step = None + with self.logits_lock: if future is None: if ids in self.logits_processor: del self.logits_processor[ids] return + if not skip: + self.insert_processor = True + if isinstance(future, LogitsProcessorBase): self.logits_processor[ids] = future for token in prefill_tokens: @@ -76,15 +93,26 @@ def add_logits_processor( else: self.logits_processor[ids] = [future, prefill_tokens] - def update_vocab_mask(self, skip_idx_list: List[int] = []): - """update vocab mask. (cpu-heavy operation)""" + def get_available_processors(self): + """ + get available logits processor + """ + available_processors = None + with self.logits_lock: + for processor in self.logits_processor.values(): + if processor.is_terminated(): + continue + available_processors = processor + return available_processors + + def update_logits_processor(self): + """update logits processor""" if len(self.logits_processor) == 0: return with self.logits_lock: for idx, processor in self.logits_processor.items(): if processor is None: - del self.logits_processor[idx] continue if not isinstance(processor, LogitsProcessorBase): @@ -93,16 +121,26 @@ def update_vocab_mask(self, skip_idx_list: List[int] = []): for token in prefill_tokens: self.logits_processor[idx].accept_token(token) - available_processors = None - for processor in self.logits_processor.values(): - if processor.is_terminated(): - continue - available_processors = processor - if available_processors is None: - return + def update_vocab_mask(self, skip_idx_list: List[int] = []): + """Updates vocabulary mask based on active constraints. + + Note: This is a CPU-intensive operation that: + 1. Processes pending logits processors + 2. Allocates and fills token bitmask + + Args: + skip_idx_list: Sequence IDs to exclude from masking + """ + if len(self.logits_processor) == 0: + return + + self.update_logits_processor() + available_processors = self.get_available_processors() + if available_processors is None: + return # allocate token bitmask - self.token_bitmask = available_processors.allocate_token_bitmask() + token_bitmask = available_processors.allocate_token_bitmask() with self.logits_lock: # fill token bitmask @@ -110,21 +148,34 @@ def update_vocab_mask(self, skip_idx_list: List[int] = []): if processor.is_terminated() or idx in skip_idx_list: continue - processor.fill_token_bitmask(self.token_bitmask, idx) + processor.fill_token_bitmask(token_bitmask, idx) + self.token_bitmask = paddle.to_tensor(token_bitmask.numpy()) def apply_token_mask(self, logits: paddle.Tensor, skip_idx_list: List[int] = []): - """apply token mask to logits""" - if len(self.logits_processor) == 0 or self.token_bitmask is None: + """Applies vocabulary mask to restrict token sampling. + + Args: + logits: Input logits tensor + skip_idx_list: Sequence IDs to exclude from masking + + Returns: + Masked logits tensor + """ + if len(self.logits_processor) == 0: return logits - # self.async_step.result() - available_processors = None - with self.logits_lock: - for processor in self.logits_processor.values(): - if processor.is_terminated(): - continue - available_processors = processor - if available_processors is None: + if self.async_step is not None: + self.async_step.result() + self.async_step = None + + self.update_logits_processor() + if self.insert_processor: + self.update_vocab_mask(skip_idx_list) + with self.logits_lock: + self.insert_processor = False + + available_processors = self.get_available_processors() + if available_processors is None or self.token_bitmask is None: return logits indices = list(self.logits_processor.keys()) @@ -141,31 +192,60 @@ def _accept_token(self, idx: int, token: int): self.logits_processor[idx].accept_token(token) - def update_output_tokens(self, next_tokens: paddle.Tensor, skip_idx_list: List[int] = []): - """update output tokens""" + def update_output_tokens( + self, next_tokens: paddle.Tensor, skip_idx_list: List[int] = [], skip_list_next: List[int] = [] + ): + """Updates processors with newly generated tokens asynchronously. + + Args: + next_tokens: Newly sampled tokens + skip_idx_list: Current step IDs to skip + skip_list_next: Next step IDs to skip + """ if len(self.logits_processor) == 0: return - token_ids = next_tokens.numpy().tolist() - with self.logits_lock: - for idx in self.logits_processor.keys(): - token = token_ids[idx][0] - if token < 0 or self.logits_processor[idx] is None or idx in skip_idx_list: - continue + # create async operation for guided decoding + def async_update(next_tokens, skip_idx_list, skip_list_next): + token_ids = next_tokens.numpy().tolist() + with self.logits_lock: + for idx in self.logits_processor.keys(): + token = token_ids[idx][0] + if token < 0 or self.logits_processor[idx] is None or idx in skip_idx_list: + continue + self._accept_token(idx, token) - self._accept_token(idx, token) + self.update_vocab_mask(skip_list_next) + + self.async_step = self.executor.submit(async_update, next_tokens, skip_idx_list, skip_list_next) - def pre_process(self, skip_idx_list: List[int] = []): - """pre process before running""" - # create async operation for guided decoding - # TODO: support async - self.update_vocab_mask(skip_idx_list) - # self.async_step = self.executor.submit(self.update_vocab_mask) +class SamplerBase(nn.Layer): + def __init__(self): + """Base class for sampler""" + super().__init__() + + @abstractmethod + def forward_cuda(self, *args, **kwargs) -> SamplerOutput: + pass + + def apply_logits_processor(self, *args, **kwargs): + """apply logits processor to sampler""" + pass + + def async_post_process(self, *args, **kwargs): + """async accept token from outside, update vocab mask for inside""" + pass -class Sampler(nn.Layer): + +class Sampler(SamplerBase): """ - Sampler for normal generation. + Normal generation sampler with guided decoding support. + + Features: + - Top-p (nucleus) sampling + - Repetition/frequency penalties + - Integration with guided decoding processors """ def __init__(self, fd_config: FDConfig = None): @@ -198,13 +278,19 @@ def apply_logits_processor( ids: int, future: Optional[Any] = None, prefill_tokens: List[int] = [], + skip: bool = False, ): """apply logits processor to sampler""" - self.processor.add_logits_processor(ids, future, prefill_tokens) + self.processor.add_logits_processor(ids, future=future, prefill_tokens=prefill_tokens, skip=skip) - def pre_process(self, skip_idx_list: List[int] = []): - """pre process before running""" - self.processor.pre_process(skip_idx_list) + def async_post_process( + self, + next_tokens: paddle.Tensor, + skip_idx_list: List[int] = [], + skip_list_next: List[int] = [], + ): + """async accept token from outside, update vocab mask for inside""" + self.processor.update_output_tokens(next_tokens, skip_idx_list, skip_list_next) def compute_logprobs(self, logits: paddle.Tensor) -> paddle.Tensor: """ """ @@ -305,7 +391,7 @@ def forward_cuda( return sampler_output -class SpeculativeSampler(nn.Layer): +class SpeculativeSampler(SamplerBase): """ Sampler for speculative generation. """ @@ -321,19 +407,6 @@ def __init__(self, fd_config: FDConfig): self.speculative_max_candidate_len = fd_config.speculative_config.max_candidate_len self.speculative_benchmark_mode = fd_config.speculative_config.benchmark_mode - def pre_process(self, skip_idx_list: List[int] = []): - """pre process before running""" - pass - - def apply_logits_processor( - self, - ids: int, - future: Optional[Any] = None, - prefill_tokens: List[int] = [], - ): - """apply logits processor to sampler""" - pass - def forward_cuda( self, logits: paddle.Tensor, @@ -401,7 +474,7 @@ def forward_cuda( return None -class MTPSampler(nn.Layer): +class MTPSampler(SamplerBase): """ """ def __init__(self, fd_config: FDConfig): @@ -412,19 +485,6 @@ def __init__(self, fd_config: FDConfig): else: raise NotImplementedError - def pre_process(self, skip_idx_list: List[int] = []): - """pre process before running""" - pass - - def apply_logits_processor( - self, - ids: int, - future: Optional[Any] = None, - prefill_tokens: List[int] = [], - ): - """apply logits processor to sampler""" - pass - def forward_cuda( self, logits: paddle.Tensor, From 1fe01e7a8110e07260b3dba17b689ff37bb69b58 Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 15 Aug 2025 10:22:13 +0800 Subject: [PATCH 2/7] add error traceback info --- fastdeploy/cache_manager/cache_messager.py | 3 +- .../cache_manager/cache_transfer_manager.py | 3 +- .../cache_manager/prefix_cache_manager.py | 13 +++++---- fastdeploy/engine/engine.py | 8 ++--- fastdeploy/engine/expert_service.py | 4 +-- .../engine/sched/resource_manager_v1.py | 5 ++-- fastdeploy/entrypoints/api_server.py | 3 +- fastdeploy/entrypoints/engine_client.py | 5 ++-- fastdeploy/entrypoints/llm.py | 2 +- fastdeploy/entrypoints/openai/api_server.py | 5 ++-- fastdeploy/entrypoints/openai/serving_chat.py | 26 ++++++++++++----- .../entrypoints/openai/serving_completion.py | 29 +++++++++++++++---- .../tool_parsers/ernie_x1_tool_parser.py | 19 ++++++++---- fastdeploy/input/ernie_vl_processor.py | 4 ++- .../inter_communicator/engine_cache_queue.py | 3 +- fastdeploy/inter_communicator/zmq_client.py | 9 +++--- .../guided_decoding/base_guided_decoding.py | 3 +- .../guided_decoding/xgrammar_backend.py | 9 +++--- fastdeploy/output/token_processor.py | 4 +-- fastdeploy/platforms/cuda.py | 5 +++- fastdeploy/platforms/dcu.py | 5 +++- fastdeploy/platforms/gcu.py | 5 +++- fastdeploy/platforms/maca.py | 4 ++- fastdeploy/platforms/xpu.py | 5 +++- fastdeploy/scheduler/global_scheduler.py | 2 +- fastdeploy/scheduler/splitwise_scheduler.py | 24 ++++++++------- fastdeploy/splitwise/splitwise_connector.py | 7 +++-- fastdeploy/worker/utils.py | 3 +- test/ce/accuracy_cases/gsm8k.py | 3 +- test/ce/deploy/deploy.py | 27 ++++++++++++----- 30 files changed, 164 insertions(+), 83 deletions(-) diff --git a/fastdeploy/cache_manager/cache_messager.py b/fastdeploy/cache_manager/cache_messager.py index 456ba1c342..409941f7d8 100644 --- a/fastdeploy/cache_manager/cache_messager.py +++ b/fastdeploy/cache_manager/cache_messager.py @@ -17,6 +17,7 @@ import math import threading import time +import traceback import numpy as np import paddle @@ -309,4 +310,4 @@ def _prefill_layerwise_send_cache_thread(self): self.last_layer_idx = prefilled_layer_idx except Exception as e: - logger.error(f"prefill layerwise send cache thread has exception: {e}") + logger.error(f"prefill layerwise send cache thread has exception: {e}, {str(traceback.format_exc())}") diff --git a/fastdeploy/cache_manager/cache_transfer_manager.py b/fastdeploy/cache_manager/cache_transfer_manager.py index 34ccf144ca..5078a513dd 100644 --- a/fastdeploy/cache_manager/cache_transfer_manager.py +++ b/fastdeploy/cache_manager/cache_transfer_manager.py @@ -19,6 +19,7 @@ import json import queue import time +import traceback import numpy as np import paddle @@ -342,7 +343,7 @@ def do_data_transfer(self): if self.rank == 0: self.cache_task_queue.barrier3.reset() except Exception as e: - logger.info(f"do_data_transfer: error: {e}") + logger.info(f"do_data_transfer: error: {e}, {str(traceback.format_exc())}") def _transfer_data( self, diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index f033a565c9..e57f0f43b8 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -20,6 +20,7 @@ import sys import threading import time +import traceback import uuid from collections import defaultdict from concurrent.futures import ThreadPoolExecutor @@ -469,7 +470,7 @@ def update_cache_blocks(self, task, block_size): self.leaf_req_map[leaf_node].add(req_id) self.cache_info[req_id] = (leaf_node, input_ids) except Exception as e: - logger.error(f"update_cache_blocks, error: {type(e)} {e}") + logger.error(f"update_cache_blocks, error: {type(e)} {e}, {str(traceback.format_exc())}") raise e def request_match_blocks(self, task, block_size, *args): @@ -555,7 +556,7 @@ def request_match_blocks(self, task, block_size, *args): ) return common_block_ids, matched_token_num, hit_info except Exception as e: - logger.error(f"request_block_ids: error: {type(e)} {e}") + logger.error(f"request_block_ids: error: {type(e)} {e}, {str(traceback.format_exc())}") raise e def request_block_ids(self, task, block_size, dec_token_num, *args): @@ -660,7 +661,7 @@ def request_block_ids(self, task, block_size, dec_token_num, *args): ) return common_block_ids, unique_block_ids, hit_info except Exception as e: - logger.error(f"request_block_ids: error: {type(e)} {e}") + logger.error(f"request_block_ids: error: {type(e)} {e}, {str(traceback.format_exc())}") raise e def release_block_ids_async(self, task): @@ -709,7 +710,7 @@ def release_block_ids(self, task): ) return except Exception as e: - logger.error(f"release_block_ids: error: {type(e)} {e}") + logger.error(f"release_block_ids: error: {type(e)} {e}, {str(traceback.format_exc())}") raise e def _handle_free_gpu_node_without_cpu(self, node): @@ -899,7 +900,7 @@ def free_block_ids_async(self, need_block_num): else: self.gpu_free_task_future = None except Exception as e: - logger.error(f"free_block_ids_async: error: {type(e)} {e}") + logger.error(f"free_block_ids_async: error: {type(e)} {e}, {str(traceback.format_exc())}") raise e def free_cpu_block_ids(self, need_block_num): @@ -1218,5 +1219,5 @@ def recv_data_transfer_result(self): + f"task_cpu_block_id {task_cpu_block_id} event_type {event_type} done" ) except Exception as e: - logger.warning(f"recv_data_transfer_result: error: {e}") + logger.warning(f"recv_data_transfer_result: error: {e}, {str(traceback.format_exc())}") raise e diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index db3bdefffe..c3149b55d2 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -600,7 +600,7 @@ def receiver_loop(): time.sleep(0.001) except Exception as e: - llm_logger.error(f"Error in main loop: {e}") + llm_logger.error(f"Error in main loop: {e}, {str(traceback.format_exc())}") time.sleep(0.1) threading.Thread(target=receiver_loop, daemon=True).start() @@ -987,7 +987,7 @@ def _exit_sub_services(self): try: os.killpg(p.pid, signal.SIGTERM) except Exception as e: - print(f"Error extracting file: {e}") + print(f"Error extracting file: {e}, {str(traceback.format_exc())}") self.worker_ready_signal.clear() self.exist_task_signal.clear() self.exist_swapped_task_signal.clear() @@ -1000,7 +1000,7 @@ def _exit_sub_services(self): try: os.killpg(self.worker_proc.pid, signal.SIGTERM) except Exception as e: - print(f"Error extracting sub services: {e}") + print(f"Error extracting sub services: {e}, {str(traceback.format_exc())}") self.engine_worker_queue.cleanup() if hasattr(self, "zmq_server") and self.zmq_server is not None: @@ -1175,7 +1175,7 @@ def generate(self, prompts, stream): try: req_id = self._format_and_add_data(prompts) except Exception as e: - llm_logger.error(f"Error happend while adding request, details={e}") + llm_logger.error(f"Error happend while adding request, details={e}, {str(traceback.format_exc())}") raise EngineError(str(e), error_code=400) # Get the result of the current request diff --git a/fastdeploy/engine/expert_service.py b/fastdeploy/engine/expert_service.py index 9cf5f97f7f..2ed5f8924a 100644 --- a/fastdeploy/engine/expert_service.py +++ b/fastdeploy/engine/expert_service.py @@ -269,7 +269,7 @@ def receiver_loop(): time.sleep(0.001) continue except Exception as e: - llm_logger.error(f"get decode tasks error: {e}") + llm_logger.error(f"get decode tasks error: {e}, {str(traceback.format_exc())}") threading.Thread(target=receiver_loop, daemon=True).start() @@ -378,4 +378,4 @@ def start_expert_service(cfg, local_data_parallel_id, ipc_signal_suffix): expert_service.start(ipc_signal_suffix, local_data_parallel_id) expert_service.split_connector.start_receiver() except Exception as e: - llm_logger.exception(f"Expert service failed to start: {e}") + llm_logger.exception(f"Expert service failed to start: {e}, {str(traceback.format_exc())}") diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 26eba4ae09..ec8703ee0d 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -16,6 +16,7 @@ import threading import time +import traceback from collections import deque from collections.abc import Iterable from concurrent.futures import ThreadPoolExecutor @@ -389,7 +390,7 @@ def get_prefix_cached_blocks(self, request: Request): request.cache_prepare_time = time.time() - cache_prepare_time return True except Exception as e: - llm_logger.error(f"prefix match blocks error: {e}, waiting reschedule...") + llm_logger.error(f"prefix match blocks error: {e}, {str(traceback.format_exc())} waiting reschedule...") return False def add_request(self, request: Request) -> None: @@ -441,4 +442,4 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]): self.stop_flags[request.idx] = True del self.requests[req_id] except Exception as e: - llm_logger.error(e) + llm_logger.error(f"finish_request err: {e}, {str(traceback.format_exc())}") diff --git a/fastdeploy/entrypoints/api_server.py b/fastdeploy/entrypoints/api_server.py index f27c008314..4f4d7f2250 100644 --- a/fastdeploy/entrypoints/api_server.py +++ b/fastdeploy/entrypoints/api_server.py @@ -15,6 +15,7 @@ """ import json +import traceback import uvicorn from fastapi import FastAPI @@ -114,7 +115,7 @@ def launch_api_server(args) -> None: log_level="info", ) # set log level to error to avoid log except Exception as e: - api_server_logger.error(f"launch sync http server error, {e}") + api_server_logger.error(f"launch sync http server error, {e}, {str(traceback.format_exc())}") def main(): diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index daed93b8f9..cf1ebdd297 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -15,6 +15,7 @@ """ import time +import traceback import uuid import numpy as np @@ -141,7 +142,7 @@ def add_requests(self, task): work_process_metrics.prompt_tokens_total.inc(input_ids_len) work_process_metrics.request_prompt_tokens.observe(input_ids_len) except Exception as e: - api_server_logger.error(e) + api_server_logger.error(f"add_requests error: {e}, {str(traceback.format_exc())}") raise EngineError(str(e), error_code=400) if input_ids_len + min_tokens >= self.max_model_len: @@ -194,7 +195,7 @@ def add_requests(self, task): else: self.zmq_client.send_pyobj(task) except Exception as e: - api_server_logger.error(e) + api_server_logger.error(f"zmq_client send task error: {e}, {str(traceback.format_exc())}") raise EngineError(str(e), error_code=400) def vaild_parameters(self, data): diff --git a/fastdeploy/entrypoints/llm.py b/fastdeploy/entrypoints/llm.py index 001cfad3e0..dd48e6d00e 100644 --- a/fastdeploy/entrypoints/llm.py +++ b/fastdeploy/entrypoints/llm.py @@ -341,7 +341,7 @@ def _build_sample_logprobs(self, logprobs_lists: LogprobsLists, topk_logprobs: i return result except Exception as e: - llm_logger.error(f"Error building sample logprobs from LogprobsLists: {e}") + llm_logger.error(f"Error building sample logprobs from LogprobsLists: {e}, {str(traceback.format_exc())}") def _run_engine(self, req_ids: list[str], use_tqdm: bool, topk_logprobs: Optional[int] = None): """ diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 2a4c0e7aba..6a5355f102 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -18,6 +18,7 @@ import os import threading import time +import traceback from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from multiprocessing import current_process @@ -155,7 +156,7 @@ async def lifespan(app: FastAPI): multiprocess.mark_process_dead(os.getpid()) api_server_logger.info(f"Closing metrics client pid: {pid}") except Exception as e: - api_server_logger.warning(e) + api_server_logger.warning(f"exit error: {e}, {str(traceback.format_exc())}") app = FastAPI(lifespan=lifespan) @@ -349,7 +350,7 @@ def launch_api_server() -> None: log_level="info", ) # set log level to error to avoid log except Exception as e: - api_server_logger.error(f"launch sync http server error, {e}") + api_server_logger.error(f"launch sync http server error, {e}, {str(traceback.format_exc())}") metrics_app = FastAPI() diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index b14f28e627..91751fd1c0 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -92,7 +92,9 @@ async def create_chat_completion(self, request: ChatCompletionRequest): if isinstance(prompt_token_ids, np.ndarray): prompt_token_ids = prompt_token_ids.tolist() except Exception as e: - return ErrorResponse(code=400, message=str(e)) + error_msg = f"request[{request_id}] send to infer error: {str(e)}, {str(traceback.format_exc())}" + api_server_logger.error(error_msg) + return ErrorResponse(code=400, message=error_msg) del current_req_dict try: @@ -101,8 +103,13 @@ async def create_chat_completion(self, request: ChatCompletionRequest): await self.engine_client.semaphore.acquire() else: await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time) - except Exception: - return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") + except Exception as e: + error_msg = ( + f"request[{request_id}] waiting error: {str(e)}, {str(traceback.format_exc())}, " + f"max waiting time: {self.max_waiting_time}" + ) + api_server_logger.error(error_msg) + return ErrorResponse(code=408, message=error_msg) if request.stream: return self.chat_completion_stream_generator( @@ -114,9 +121,12 @@ async def create_chat_completion(self, request: ChatCompletionRequest): request, request_id, request.model, prompt_token_ids, text_after_process ) except Exception as e: - return ErrorResponse(code=400, message=str(e)) + error_msg = f"request[{request_id}] generator error: {str(e)}, {str(traceback.format_exc())}" + api_server_logger.error(error_msg) + return ErrorResponse(code=400, message=error_msg) def _create_streaming_error_response(self, message: str) -> str: + api_server_logger.error(message) error_response = ErrorResponse( code=400, message=message, @@ -334,7 +344,9 @@ async def chat_completion_stream_generator( yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" except Exception as e: - error_data = self._create_streaming_error_response(str(e)) + error_data = self._create_streaming_error_response( + f"equest[{request_id}] generate stream error: {str(e)}, {str(traceback.format_exc())}" + ) yield f"data: {error_data}\n\n" finally: dealer.close() @@ -553,6 +565,6 @@ def _build_logprobs_response( return LogProbs(content=[sampled_entry]) except Exception as e: - api_server_logger.error("Error in _build_logprobs_response: %s", e) - api_server_logger.error(traceback.format_exc()) + error_msg = f"Error in _build_logprobs_response: {e}, {str(traceback.format_exc())}" + api_server_logger.error(error_msg) return None diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index a6aadcf060..896fb6aa32 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -16,6 +16,7 @@ import asyncio import time +import traceback import uuid from typing import List, Optional @@ -92,7 +93,9 @@ async def create_completion(self, request: CompletionRequest): else: raise ValueError("Prompt must be a string, a list of strings or a list of integers.") except Exception as e: - return ErrorResponse(message=str(e), code=400) + error_msg = f"OpenAIServingCompletion create_completion: {e}, {str(traceback.format_exc())}" + api_server_logger.error(error_msg) + return ErrorResponse(message=error_msg, code=400) if request_prompt_ids is not None: request_prompts = request_prompt_ids @@ -113,6 +116,8 @@ async def create_completion(self, request: CompletionRequest): text_after_process_list.append(current_req_dict.get("text_after_process")) prompt_batched_token_ids.append(prompt_token_ids) except Exception as e: + error_msg = f"OpenAIServingCompletion format error: {e}, {str(traceback.format_exc())}" + api_server_logger.error(error_msg) return ErrorResponse(message=str(e), code=400) del current_req_dict @@ -122,8 +127,13 @@ async def create_completion(self, request: CompletionRequest): await self.engine_client.semaphore.acquire() else: await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time) - except Exception: - return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") + except Exception as e: + error_msg = ( + f"OpenAIServingCompletion waiting error: {e}, {str(traceback.format_exc())}, " + f"max waiting time: {self.max_waiting_time}" + ) + api_server_logger.error(error_msg) + return ErrorResponse(code=408, message=error_msg) if request.stream: return self.completion_stream_generator( @@ -147,10 +157,16 @@ async def create_completion(self, request: CompletionRequest): text_after_process_list=text_after_process_list, ) except Exception as e: - return ErrorResponse(code=400, message=str(e)) + error_msg = ( + f"OpenAIServingCompletion completion_full_generator error: {e}, {str(traceback.format_exc())}" + ) + api_server_logger.error(error_msg) + return ErrorResponse(code=400, message=error_msg) except Exception as e: - return ErrorResponse(message=str(e), code=400) + error_msg = f"OpenAIServingCompletion create_completion error: {e}, {str(traceback.format_exc())}" + api_server_logger.error(error_msg) + return ErrorResponse(message=error_msg, code=400) async def completion_full_generator( self, @@ -422,6 +438,7 @@ async def completion_stream_generator( choices = [] except Exception as e: + api_server_logger.error(f"Error in completion_stream_generator: {e}, {str(traceback.format_exc())}") yield f"data: {ErrorResponse(message=str(e), code=400).model_dump_json(exclude_unset=True)}\n\n" finally: del request @@ -607,5 +624,5 @@ def _build_logprobs_response( ) except Exception as e: - api_server_logger.error("Error in _build_logprobs_response: %s", e) + api_server_logger.error(f"Error in _build_logprobs_response: {str(e)}, {str(traceback.format_exc())}") return None diff --git a/fastdeploy/entrypoints/openai/tool_parsers/ernie_x1_tool_parser.py b/fastdeploy/entrypoints/openai/tool_parsers/ernie_x1_tool_parser.py index cec1f68401..6f0534cf1e 100644 --- a/fastdeploy/entrypoints/openai/tool_parsers/ernie_x1_tool_parser.py +++ b/fastdeploy/entrypoints/openai/tool_parsers/ernie_x1_tool_parser.py @@ -14,6 +14,7 @@ import json import re +import traceback import uuid from collections.abc import Sequence from typing import Union @@ -162,10 +163,12 @@ def extract_tool_calls(self, model_output: str, request: ChatCompletionRequest) } ) except Exception as e: - data_processor_logger.debug(f"Failed to parse tool call: {str(e)}") + data_processor_logger.error( + f"Failed to parse tool call: {str(e)}, {str(traceback.format_exc())}" + ) continue except Exception as e: - data_processor_logger.debug(f"Failed to parse tool call: {str(e)}") + data_processor_logger.error(f"Failed to parse tool call: {str(e)}, {str(traceback.format_exc())}") continue if not function_call_arr: @@ -211,7 +214,9 @@ def extract_tool_calls(self, model_output: str, request: ChatCompletionRequest) ) except Exception as e: - data_processor_logger.error(f"Error in extracting tool call from response: {str(e)}") + data_processor_logger.error( + f"Error in extracting tool call from response: {str(e)}, {str(traceback.format_exc())}" + ) return ExtractedToolCallInformation(tools_called=False, tool_calls=None, content=model_output) def extract_tool_calls_streaming( @@ -302,7 +307,9 @@ def extract_tool_calls_streaming( self.streamed_args_for_tool[self.current_tool_id] = args_json return delta except Exception as e: - data_processor_logger.debug(f"Partial arguments parsing: {str(e)}") + data_processor_logger.error( + f"Partial arguments parsing: {str(e)}, {str(traceback.format_exc())}" + ) if "" in self.buffer: end_pos = self.buffer.find("") @@ -316,5 +323,7 @@ def extract_tool_calls_streaming( return delta except Exception as e: - data_processor_logger.error(f"Error in streaming tool call extraction: {str(e)}") + data_processor_logger.error( + f"Error in streaming tool call extraction: {str(e)}, {str(traceback.format_exc())}" + ) return None diff --git a/fastdeploy/input/ernie_vl_processor.py b/fastdeploy/input/ernie_vl_processor.py index e8239f7adb..11472fe7aa 100644 --- a/fastdeploy/input/ernie_vl_processor.py +++ b/fastdeploy/input/ernie_vl_processor.py @@ -14,6 +14,8 @@ # limitations under the License. """ +import traceback + import numpy as np from paddleformers.generation import GenerationConfig @@ -151,7 +153,7 @@ def _parse_processor_kwargs(self, kwargs): return kwargs except Exception as e: - data_processor_logger.warning(f"Invalid mm-processor-kwargs format: {e}") + data_processor_logger.warning(f"Invalid mm-processor-kwargs format: {e}, {str(traceback.format_exc())}") return {} def _parse_limits(self, limits): diff --git a/fastdeploy/inter_communicator/engine_cache_queue.py b/fastdeploy/inter_communicator/engine_cache_queue.py index 03fae97d7d..6f56550386 100644 --- a/fastdeploy/inter_communicator/engine_cache_queue.py +++ b/fastdeploy/inter_communicator/engine_cache_queue.py @@ -16,6 +16,7 @@ import threading import time +import traceback from multiprocessing.managers import ( AcquirerProxy, BaseManager, @@ -275,5 +276,5 @@ def empty(self): try: return len(self.transfer_task_queue) == 0 except Exception as e: - logger.error(f"empty function meets error: {e}") + logger.error(f"empty function meets error: {e}, {str(traceback.format_exc())}") raise e diff --git a/fastdeploy/inter_communicator/zmq_client.py b/fastdeploy/inter_communicator/zmq_client.py index 05e55929dd..2703efe3a4 100644 --- a/fastdeploy/inter_communicator/zmq_client.py +++ b/fastdeploy/inter_communicator/zmq_client.py @@ -17,6 +17,7 @@ import os import threading import time +import traceback import msgpack import zmq @@ -135,7 +136,7 @@ def send_multipart(self, req_id, data): llm_logger.debug(f"send_multipart result: {req_id} len {len(data)} elapse: {time.time()-start_send}") except Exception as e: - llm_logger.error(f"Send result to zmq client failed: {e}") + llm_logger.error(f"Send result to zmq client failed: {e}, {str(traceback.format_exc())}") if data[-1].finished: with self.mutex: @@ -155,7 +156,7 @@ def receive_json_once(self, block=False): return None, None except Exception as e: self.close() - llm_logger.warning(f"{e}") + llm_logger.warning(f"{e}, {str(traceback.format_exc())}") return str(e), None def receive_pyobj_once(self, block=False): @@ -171,7 +172,7 @@ def receive_pyobj_once(self, block=False): return None, None except Exception as e: self.close() - llm_logger.warning(f"{e}") + llm_logger.warning(f"{e}, {str(traceback.format_exc())}") return str(e), None def _clear_ipc(self, name): @@ -206,7 +207,7 @@ def close(self): self._clear_ipc(self.file_name) self._clear_ipc(self.router_path) except Exception as e: - llm_logger.warning(f"Failed to close ZMQ connection - {e}") + llm_logger.warning(f"Failed to close ZMQ connection - {e}, {str(traceback.format_exc())}") return def __exit__(self, exc_type, exc_val, exc_tb): diff --git a/fastdeploy/model_executor/guided_decoding/base_guided_decoding.py b/fastdeploy/model_executor/guided_decoding/base_guided_decoding.py index 7baf2fe971..b23d0c85d8 100644 --- a/fastdeploy/model_executor/guided_decoding/base_guided_decoding.py +++ b/fastdeploy/model_executor/guided_decoding/base_guided_decoding.py @@ -15,6 +15,7 @@ """ import os +import traceback from concurrent.futures import ThreadPoolExecutor from fastdeploy.config import ErnieArchitectures, FDConfig @@ -300,7 +301,7 @@ def _get_tokenizer_hf(self): return tokenizer except Exception as e: - raise Exception(f"Fail to initialize hf tokenizer: {e}") + raise Exception(f"Fail to initialize hf tokenizer: {e}, {str(traceback.format_exc())}") def add_cache(self, schemata_key: tuple[str, str], processor: LogitsProcessorBase) -> None: """ diff --git a/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py b/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py index f702a1085e..0d448d4293 100644 --- a/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py +++ b/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py @@ -16,6 +16,7 @@ import json import re +import traceback from typing import Any, List, Optional import paddle @@ -263,7 +264,7 @@ def _json_processor(self, schemata: str) -> Optional[XGrammarProcessor]: try: compiled_grammar = self.grammar_compiler.compile_json_schema(schemata, any_whitespace=self.any_whitespace) except Exception as e: - llm_logger.error(f"Failed to compile json schema: {e}") + llm_logger.error(f"Failed to compile json schema: {e}, {str(traceback.format_exc())}") return None return self._create_processor(compiled_grammar) @@ -280,7 +281,7 @@ def _regex_processor(self, schemata: str) -> Optional[XGrammarProcessor]: try: compiled_grammar = self.grammar_compiler.compile_regex(schemata) except Exception as e: - llm_logger.error(f"Failed to compile regex schema: {e}") + llm_logger.error(f"Failed to compile regex schema: {e}, {str(traceback.format_exc())}") return None return self._create_processor(compiled_grammar) @@ -297,7 +298,7 @@ def _grammar_processor(self, schemata: str) -> Optional[XGrammarProcessor]: try: compiled_grammar = self.grammar_compiler.compile_grammar(schemata) except Exception as e: - llm_logger.error(f"Failed to compile ebnf schema: {e}") + llm_logger.error(f"Failed to compile ebnf schema: {e}, {str(traceback.format_exc())}") return None return self._create_processor(compiled_grammar) @@ -324,7 +325,7 @@ def _structural_tag_processor(self, schemata: str) -> Optional[XGrammarProcessor compiled_grammar = self.grammar_compiler.compile_structural_tag(tags, structural_tag["triggers"]) except Exception as e: - llm_logger.error(f"Failed to compile structural tags schema: {e}") + llm_logger.error(f"Failed to compile structural tags schema: {e}, {str(traceback.format_exc())}") return None return self._create_processor(compiled_grammar) diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index ebb64cebc7..36ab0c362b 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -201,7 +201,7 @@ def process_metrics(): self.prefill_time_signal.value[current_index] = 0 current_index += 1 except Exception as e: - llm_logger.error(f"Error processing prefill metrics: {e}") + llm_logger.error(f"Error processing prefill metrics: {e}, {str(traceback.format_exc())}") self.executor.submit(process_metrics) @@ -215,7 +215,7 @@ def postprocess(self, batch_result): try: self.cached_generated_tokens.put_results(batch_result) except Exception as e: - llm_logger.error(f"Error in TokenProcessor's postprocess: {e}") + llm_logger.error(f"Error in TokenProcessor's postprocess: {e}, {str(traceback.format_exc())}") def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False): """ diff --git a/fastdeploy/platforms/cuda.py b/fastdeploy/platforms/cuda.py index 6676d3c0f5..38504134a1 100644 --- a/fastdeploy/platforms/cuda.py +++ b/fastdeploy/platforms/cuda.py @@ -14,6 +14,8 @@ # limitations under the License. """ +import traceback + import paddle from fastdeploy.utils import console_logger as logger @@ -40,7 +42,8 @@ def available(self): logger.warning( "You are using GPU version PaddlePaddle, but there is no GPU " "detected on your machine. Maybe CUDA devices is not set properly." - f"\n Original Error is {e}" + f"\n Original Error is {e}, " + f"{str(traceback.format_exc())}" ) return False diff --git a/fastdeploy/platforms/dcu.py b/fastdeploy/platforms/dcu.py index bfd848335c..c18c45aca4 100644 --- a/fastdeploy/platforms/dcu.py +++ b/fastdeploy/platforms/dcu.py @@ -14,6 +14,8 @@ """ dcu platform file """ +import traceback + import paddle from paddleformers.utils.log import logger @@ -39,7 +41,8 @@ def available(self): logger.warning( "You are using GPU version PaddlePaddle, but there is no GPU " "detected on your machine. Maybe CUDA devices is not set properly." - f"\n Original Error is {e}" + f"\n Original Error is {e}, " + f"{str(traceback.format_exc())}" ) return False diff --git a/fastdeploy/platforms/gcu.py b/fastdeploy/platforms/gcu.py index e812113e1e..76bb170b54 100644 --- a/fastdeploy/platforms/gcu.py +++ b/fastdeploy/platforms/gcu.py @@ -14,6 +14,8 @@ # limitations under the License. """ +import traceback + import paddle from fastdeploy.utils import console_logger as logger @@ -40,7 +42,8 @@ def available(self): logger.warning( "You are using GCUPlatform, but there is no GCU " "detected on your machine. Maybe GCU devices is not set properly." - f"\n Original Error is {e}" + f"\n Original Error is {e}, " + f"{str(traceback.format_exc())}" ) return False diff --git a/fastdeploy/platforms/maca.py b/fastdeploy/platforms/maca.py index f695a3d01a..250cebf6e1 100644 --- a/fastdeploy/platforms/maca.py +++ b/fastdeploy/platforms/maca.py @@ -17,6 +17,7 @@ """ maca platform file """ +import traceback import paddle from paddleformers.utils.log import logger @@ -43,7 +44,8 @@ def available(self): logger.warning( "You are using GPU version PaddlePaddle, but there is no GPU " "detected on your machine. Maybe CUDA devices is not set properly." - f"\n Original Error is {e}" + f"\n Original Error is {e}, " + f"{str(traceback.format_exc())}" ) return False diff --git a/fastdeploy/platforms/xpu.py b/fastdeploy/platforms/xpu.py index 2f31107423..8bc8236359 100644 --- a/fastdeploy/platforms/xpu.py +++ b/fastdeploy/platforms/xpu.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import traceback + import paddle from fastdeploy.utils import console_logger as logger @@ -38,7 +40,8 @@ def available(self): logger.warning( "You are using XPU version PaddlePaddle, but there is no XPU " "detected on your machine. Maybe CUDA devices is not set properly." - f"\n Original Error is {e}" + f"\n Original Error is {e}, " + f"{str(traceback.format_exc())}" ) return False diff --git a/fastdeploy/scheduler/global_scheduler.py b/fastdeploy/scheduler/global_scheduler.py index 8d9b67a6a8..f3962992cc 100644 --- a/fastdeploy/scheduler/global_scheduler.py +++ b/fastdeploy/scheduler/global_scheduler.py @@ -237,7 +237,7 @@ def _keep_alive(self): ) time.sleep(self.keep_alive_duration / 2) except Exception as e: - scheduler_logger.error(f"Scheduler keep alive failed: {e}") + scheduler_logger.error(f"Scheduler keep alive failed: {e}, {str(traceback.format_exc())}") time.sleep(min(3, self.keep_alive_duration / 4)) def _scheduler_name_from_request_queue(self, request_queue: str) -> str: diff --git a/fastdeploy/scheduler/splitwise_scheduler.py b/fastdeploy/scheduler/splitwise_scheduler.py index 61dbd22309..ab1799f440 100644 --- a/fastdeploy/scheduler/splitwise_scheduler.py +++ b/fastdeploy/scheduler/splitwise_scheduler.py @@ -20,6 +20,7 @@ import random import threading import time +import traceback from collections import deque from typing import List @@ -379,7 +380,7 @@ def run(self): if total == 0: time.sleep(0.01) except Exception as e: - logger.error(f"ResultsReader{self.idx} sync results error: {e!s}") + logger.error(f"ResultsReader{self.idx} sync results error: {e!s}, {str(traceback.format_exc())}") def sync_results(self, keys): """ @@ -402,7 +403,7 @@ def sync_results(self, keys): result = RequestOutput.from_dict(data) self.data.appendleft(result) except Exception as e: - logger.error(f"Parse Result Error:{e}, {result}") + logger.error(f"Parse Result Error:{e}, {str(traceback.format_exc())}, {result}") return total @@ -498,7 +499,7 @@ def loop_schedule(self): except IndexError: continue except Exception as e: - logger.error(f"APIScheduler Schedule req error: {e!s}") + logger.error(f"APIScheduler Schedule req error: {e!s}, {str(traceback.format_exc())}") def schedule(self, req, pnodes, dnodes, mnodes, group=""): """ @@ -573,8 +574,8 @@ def loop_clear_expired_nodes(self): # logger.info(f"clear expired nodes: {nodeid}") self.client.hdel(self.cluster_key, nodeid) time.sleep(self.clear_expired_nodes_period) - except Exception: - logger.error("APIScheduler clear expired nodes error: {str(e)}") + except Exception as e: + logger.error(f"APIScheduler clear expired nodes error: {str(e)}, {str(traceback.format_exc())}") def select_pd(self, req, nodes, role): """ @@ -664,7 +665,7 @@ def run(self): # e = time.time() # logger.info(f"Lpush {self.idx}: {key} used {e-s} {len(items)} items") except Exception as e: - logger.error(f"ResultWriter write error: {e!s}") + logger.error(f"ResultWriter write error: {e!s}, {str(traceback.format_exc())}") class InferScheduler: @@ -723,7 +724,7 @@ def routine_report(self): self.client.hset(self.cluster_key, self.nodeid, info) time.sleep(self.sync_period / 1000.0) except Exception as e: - logger.error(f"InferScheduler routine report error: {e!s}") + logger.error(f"InferScheduler routine report error: {e!s}, {str(traceback.format_exc())}") def loop_expire_reqs(self): """ @@ -733,8 +734,8 @@ def loop_expire_reqs(self): try: self.node.expire_reqs(self.release_load_expire_period) time.sleep(60) - except Exception: - logger.error("InferScheduler expire reqs error: {e}") + except Exception as e: + logger.error(f"InferScheduler expire reqs error: {e}, {str(traceback.format_exc())}") def loop_get_reqs(self): """ @@ -772,7 +773,7 @@ def select_writer(req): else: self.node.add_req(req.request_id, 1) except Exception as e: - logger.error(f"InferScheduler loop get reqs error: {e!s}") + logger.error(f"InferScheduler loop get reqs error: {e!s}, {str(traceback.format_exc())}") def get_requests( self, @@ -807,7 +808,8 @@ def get_requests( return reqs # logger.info(f"Get Requests from Scheduler: {req.request_id}") reqs.append(req) - except Exception: + except Exception as e: + logger.error(f"InferScheduler get requests error: {e}, {str(traceback.format_exc())}") return reqs return reqs diff --git a/fastdeploy/splitwise/splitwise_connector.py b/fastdeploy/splitwise/splitwise_connector.py index 6b4c8ce04d..8924c00f56 100644 --- a/fastdeploy/splitwise/splitwise_connector.py +++ b/fastdeploy/splitwise/splitwise_connector.py @@ -16,6 +16,7 @@ import json import time +import traceback from concurrent.futures import ThreadPoolExecutor from typing import Dict @@ -97,7 +98,7 @@ def start_receiver(self): time.sleep(0.001) except Exception as e: - logger.error(f"Receiver error: {e}") + logger.error(f"Receiver error: {e}, {str(traceback.format_exc())}") time.sleep(1) def _get_push_socket(self, addr): @@ -152,7 +153,7 @@ def _send_message(self, addr, msg_type: str, payload): except zmq.Again: logger.warning(f"Send queue full for {addr}") except Exception as e: - logger.error(f"Send to {addr} failed: {e}") + logger.error(f"Send to {addr} failed: {e}, {str(traceback.format_exc())}") self._close_connection(addr) except Exception as e: @@ -433,7 +434,7 @@ def _process_message(self, message: bytes): self.engine_worker_queue.put_cache_info(payload) except Exception as e: - logger.error(f"Message processing failed: {e}") + logger.error(f"Message processing failed: {e}, {str(traceback.format_exc())}") def _handle_prefill(self, tasks): """ diff --git a/fastdeploy/worker/utils.py b/fastdeploy/worker/utils.py index bf727c3bbf..7554c7c08a 100644 --- a/fastdeploy/worker/utils.py +++ b/fastdeploy/worker/utils.py @@ -15,6 +15,7 @@ """ import os +import traceback def check_safetensors_model(model_dir: str): @@ -45,5 +46,5 @@ def check_safetensors_model(model_dir: str): sum(flags) == safetensors_num ), f"Number of safetensor files should be {len(model_files)}, but now it's {sum(flags)}" except Exception as e: - raise Exception(f"Failed to check unified checkpoint, details: {e}.") + raise Exception(f"Failed to check unified checkpoint, details: {e}, {str(traceback.format_exc())}.") return is_safetensors diff --git a/test/ce/accuracy_cases/gsm8k.py b/test/ce/accuracy_cases/gsm8k.py index f156f58c7f..b02e4c9f1a 100644 --- a/test/ce/accuracy_cases/gsm8k.py +++ b/test/ce/accuracy_cases/gsm8k.py @@ -6,6 +6,7 @@ import os import re +import traceback from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urlparse, urlunparse @@ -120,7 +121,7 @@ def query_model(prompt): ) return response.choices[0].message.content.strip() except Exception as e: - return f"[Error] {e}" + return f"[Error] {e}, {str(traceback.format_exc())}" # ========== 评估函数 ========== diff --git a/test/ce/deploy/deploy.py b/test/ce/deploy/deploy.py index aa305360b8..50e540a997 100644 --- a/test/ce/deploy/deploy.py +++ b/test/ce/deploy/deploy.py @@ -7,6 +7,7 @@ import subprocess import sys import time +import traceback import requests import yaml @@ -175,7 +176,7 @@ def stop_server(signum=None, frame=None): # 终止进程组(包括所有子进程) os.killpg(os.getpgid(pid_port["PID"]), signal.SIGTERM) except Exception as e: - print(f"Failed to stop server: {e}") + print(f"Failed to stop server: {e}, {str(traceback.format_exc())}") for port in [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT]: try: @@ -184,7 +185,7 @@ def stop_server(signum=None, frame=None): os.kill(int(pid), signal.SIGKILL) print(f"Killed process on port {port}, pid={pid}") except Exception as e: - print(f"Failed to killed process on port: {e}") + print(f"Failed to killed process on port: {e}, {str(traceback.format_exc())}") # 若log目录存在,则重命名为log_timestamp if os.path.isdir("./log"): os.rename("./log", "./log_{}".format(time.strftime("%Y%m%d%H%M%S"))) @@ -229,8 +230,10 @@ def start_service(): # 构建命令 cmd = build_command(final_config) except Exception as e: + error_msg = f"Failed to start service: {e}, {str(traceback.format_exc())}" + print(error_msg) return Response( - json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False), + json.dumps({"status": "error", "message": error_msg}, ensure_ascii=False), status=500, content_type="application/json", ) @@ -264,8 +267,10 @@ def start_service(): return Response(json.dumps(json_data, ensure_ascii=False), status=200, content_type="application/json") except Exception as e: + error_msg = f"Failed to start service: {e}, {str(traceback.format_exc())}" + print(error_msg) return Response( - json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False), + json.dumps({"status": "error", "message": error_msg}, ensure_ascii=False), status=500, content_type="application/json", ) @@ -295,8 +300,10 @@ def switch_service(): # 构建命令 cmd = build_command(final_config) except Exception as e: + error_msg = f"Failed to switch service: {e}, {str(traceback.format_exc())}" + print(error_msg) return Response( - json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False), + json.dumps({"status": "error", "message": error_msg}, ensure_ascii=False), status=500, content_type="application/json", ) @@ -330,8 +337,10 @@ def switch_service(): return Response(json.dumps(json_data, ensure_ascii=False), status=200, content_type="application/json") except Exception as e: + error_msg = f"Failed to switch service: {e}, {str(traceback.format_exc())}" + print(error_msg) return Response( - json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False), + json.dumps({"status": "error", "message": error_msg}, ensure_ascii=False), status=500, content_type="application/json", ) @@ -406,8 +415,10 @@ def get_config(): ) except Exception as e: + error_msg = f"{e}, {str(traceback.format_exc())}" + print(error_msg) return Response( - json.dumps({"message": "api_server.log解析失败,请检查log", "error": str(e)}, ensure_ascii=False), + json.dumps({"message": "api_server.log解析失败,请检查log", "error": error_msg}, ensure_ascii=False), status=500, content_type="application/json", ) @@ -447,7 +458,7 @@ def tail_file(path, lines=50): with open(path, "r", encoding="utf-8", errors="ignore") as f: return "".join(f.readlines()[-lines:]) except Exception as e: - return f"[无法读取 {path}]: {e}\n" + return f"[无法读取 {path}]: {e}, {str(traceback.format_exc())}\n" result = f"服务启动超时,耗时:[{timeout}s]\n\n" result += "==== server.log tail 50 ====\n" From eea387726132a79065d988b7197b8970e836cd2b Mon Sep 17 00:00:00 2001 From: kevin Date: Tue, 19 Aug 2025 11:22:25 +0800 Subject: [PATCH 3/7] update error msg --- fastdeploy/engine/engine.py | 8 ++++++-- fastdeploy/entrypoints/openai/serving_chat.py | 2 +- test/ce/deploy/deploy.py | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 47e1bfd323..d4cc4f6fb6 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -984,7 +984,9 @@ def _exit_sub_services(self): try: os.killpg(p.pid, signal.SIGTERM) except Exception as e: - print(f"Error extracting file: {e}, {str(traceback.format_exc())}") + error_msg = f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}" + print(error_msg) + llm_logger.error(error_msg) self.worker_ready_signal.clear() self.exist_task_signal.clear() self.exist_swapped_task_signal.clear() @@ -997,7 +999,9 @@ def _exit_sub_services(self): try: os.killpg(self.worker_proc.pid, signal.SIGTERM) except Exception as e: - print(f"Error extracting sub services: {e}, {str(traceback.format_exc())}") + error_msg = f"Error extracting sub services: {e}, {str(traceback.format_exc())}" + print(error_msg) + llm_logger.error(error_msg) self.engine_worker_queue.cleanup() if hasattr(self, "zmq_server") and self.zmq_server is not None: diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index a0d28eedaa..28f4cb41bd 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -348,7 +348,7 @@ async def chat_completion_stream_generator( except Exception as e: error_data = self._create_streaming_error_response( - f"equest[{request_id}] generate stream error: {str(e)}, {str(traceback.format_exc())}" + f"request[{request_id}] generate stream error: {str(e)}, {str(traceback.format_exc())}" ) yield f"data: {error_data}\n\n" finally: diff --git a/test/ce/deploy/deploy.py b/test/ce/deploy/deploy.py index 50e540a997..3947d22288 100644 --- a/test/ce/deploy/deploy.py +++ b/test/ce/deploy/deploy.py @@ -185,7 +185,7 @@ def stop_server(signum=None, frame=None): os.kill(int(pid), signal.SIGKILL) print(f"Killed process on port {port}, pid={pid}") except Exception as e: - print(f"Failed to killed process on port: {e}, {str(traceback.format_exc())}") + print(f"Failed to kill process on port: {e}, {str(traceback.format_exc())}") # 若log目录存在,则重命名为log_timestamp if os.path.isdir("./log"): os.rename("./log", "./log_{}".format(time.strftime("%Y%m%d%H%M%S"))) From 4d0b1e45126f6e3e8c9c78bcba1f6871579e4919 Mon Sep 17 00:00:00 2001 From: kevin Date: Tue, 19 Aug 2025 17:25:13 +0800 Subject: [PATCH 4/7] update code --- fastdeploy/engine/engine.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index b6c52b80de..3494186fa4 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -985,9 +985,9 @@ def _exit_sub_services(self): try: os.killpg(p.pid, signal.SIGTERM) except Exception as e: - error_msg = f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}" - print(error_msg) - llm_logger.error(error_msg) + console_logger.error( + f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}" + ) self.worker_ready_signal.clear() self.exist_task_signal.clear() self.exist_swapped_task_signal.clear() @@ -1000,9 +1000,7 @@ def _exit_sub_services(self): try: os.killpg(self.worker_proc.pid, signal.SIGTERM) except Exception as e: - error_msg = f"Error extracting sub services: {e}, {str(traceback.format_exc())}" - print(error_msg) - llm_logger.error(error_msg) + console_logger.error(f"Error extracting sub services: {e}, {str(traceback.format_exc())}") self.engine_worker_queue.cleanup() if hasattr(self, "zmq_server") and self.zmq_server is not None: From b3a5680ec49cc9ac72f25de0b462cf29b51c685e Mon Sep 17 00:00:00 2001 From: kevin Date: Thu, 11 Sep 2025 11:03:57 +0800 Subject: [PATCH 5/7] update code --- .../xgrammar_apply_token_bitmask.py | 118 ------------------ 1 file changed, 118 deletions(-) delete mode 100644 fastdeploy/model_executor/guided_decoding/kernels/fastdeploy/model_executor/xgrammar_apply_token_bitmask.py diff --git a/fastdeploy/model_executor/guided_decoding/kernels/fastdeploy/model_executor/xgrammar_apply_token_bitmask.py b/fastdeploy/model_executor/guided_decoding/kernels/fastdeploy/model_executor/xgrammar_apply_token_bitmask.py deleted file mode 100644 index f0ba737a6e..0000000000 --- a/fastdeploy/model_executor/guided_decoding/kernels/fastdeploy/model_executor/xgrammar_apply_token_bitmask.py +++ /dev/null @@ -1,118 +0,0 @@ -""" -# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License" -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" - -# refer to https://github.com/mlc-ai/xgrammar/blob/main/python/xgrammar/kernels/apply_token_bitmask_inplace_triton.py - -from typing import List, Optional - -import paddle - -try: - import triton - import triton.language as tl -except ImportError as err: - raise ImportError("Triton is not installed") from err - - -@triton.jit -def apply_token_bitmask_inplace_kernel( - logits_ptr, - bitmask_ptr, - indices_ptr, - num_rows, - vocab_size, - logits_strides, - bitmask_strides, - NUM_SMS: tl.constexpr, - BLOCK_SIZE: tl.constexpr, -): - """Triton kernel for in-place logits masking using bitwise compression. - - Processes logits tensor in blocks, applying bitmask to restrict vocabulary access. - Masked positions are set to -inf to ensure zero probability during sampling. - - Note: - - Bitmask uses 32:1 compression (1 bit per vocabulary token) - - Optimized for GPU parallel processing with configurable block size - """ - pid = tl.program_id(0) - num_blocks = tl.cdiv(vocab_size, BLOCK_SIZE) - for work_id in tl.range(pid, num_rows * num_blocks, NUM_SMS): - row_id = work_id // num_blocks - block_offset = (work_id % num_blocks) * BLOCK_SIZE - batch_id = row_id if indices_ptr is None else tl.load(indices_ptr + row_id) - offsets = block_offset + tl.arange(0, BLOCK_SIZE) - bitmask_offsets = block_offset // 32 + tl.arange(0, BLOCK_SIZE // 32) - vocab_mask = offsets < vocab_size - packed_bitmask_mask = bitmask_offsets < bitmask_strides - packed_bitmask = tl.load(bitmask_ptr + batch_id * bitmask_strides + bitmask_offsets, packed_bitmask_mask) - bitmask = ((packed_bitmask[:, None] >> (tl.arange(0, 32)[None, :])) & 1) == 0 - bitmask = bitmask.reshape(BLOCK_SIZE) - - tl.store(logits_ptr + batch_id * logits_strides + offsets, -float("inf"), vocab_mask & bitmask) - - -def apply_token_bitmask_inplace_triton( - logits: paddle.Tensor, - bitmask: paddle.Tensor, - vocab_size: Optional[int] = None, - indices: Optional[List[int]] = None, -): - """Applies vocabulary mask to logits tensor using Triton GPU kernel. - - Args: - logits: Input logits tensor of shape [batch_size, vocab_size] - bitmask: Compressed mask tensor (int32) where each bit represents a token - vocab_size: Optional explicit vocabulary size (defaults to auto-detected) - indices: Optional list of batch indices to apply mask to - - Note: - Requires CUDA GPU with Triton support - Bitmask must be int32 tensor with shape [batch_size, ceil(vocab_size/32)] - """ - NUM_SMS = paddle.device.cuda.get_device_properties().multi_processor_count - BLOCK_SIZE = 4096 - - assert bitmask.dtype == paddle.int32, "bitmask must be of type int32" - - detected_vocab_size = min(logits.shape[-1], bitmask.shape[-1] * 32) - if vocab_size is None: - vocab_size = detected_vocab_size - else: - assert ( - vocab_size <= detected_vocab_size - ), f"vocab_size {vocab_size} is larger than the detected vocab_size {detected_vocab_size}" - - num_rows = len(indices) if indices is not None else logits.shape[0] if logits.ndim == 2 else 1 - - if indices is not None: - indices = paddle.to_tensor(indices, dtype=paddle.int32, place=logits.place) - - grid = (NUM_SMS,) - - apply_token_bitmask_inplace_kernel[grid]( - logits, - bitmask, - indices, - num_rows, - vocab_size, - logits.shape[-1], - bitmask.shape[-1], - NUM_SMS, - BLOCK_SIZE, - num_warps=BLOCK_SIZE // 32 // (16 // logits.element_size()), - num_stages=3, - ) From 55d9b786ca2835800bcb3bcee818714b80f177b8 Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 12 Sep 2025 11:39:31 +0800 Subject: [PATCH 6/7] update code --- .../guided_decoding/xgrammar_backend.py | 11 +++-- .../model_executor/layers/sample/sampler.py | 41 ++++++++++--------- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py b/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py index 52d83ee1d2..2b07cf04e8 100644 --- a/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py +++ b/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py @@ -150,13 +150,16 @@ def accept_token(self, token: int) -> None: """ Validate and accept a generated token against the grammar constraints. + # When the output token reaches the maximum length, + # it will be forced to get an eos_token, the output is not restricted by guided decoding + Args: token (int): The token ID to validate - - Raises: - AssertionError: If token is not allowed by the grammar """ - assert self.matcher.accept_token(token), f"Failed to accept token {token}" + if not self.matcher.accept_token(token): + llm_logger.error(f"failed to accept token [{token}]") + return False + return True def is_terminated(self) -> bool: """ diff --git a/fastdeploy/model_executor/layers/sample/sampler.py b/fastdeploy/model_executor/layers/sample/sampler.py index e6c7d80132..72b3dd3933 100644 --- a/fastdeploy/model_executor/layers/sample/sampler.py +++ b/fastdeploy/model_executor/layers/sample/sampler.py @@ -19,6 +19,7 @@ from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, List, Optional +import numpy as np import paddle import paddle.nn.functional as F from paddle import nn @@ -68,10 +69,11 @@ def __init__(self): self.logits_processor: Dict[int, Optional[Any]] = dict() self.executor = ThreadPoolExecutor() self.logits_lock = threading.Lock() - self.reasoning_parser = None + self.reasoning_end_id = None def apply_reasoning_parser(self, reasoning_parser: Optional[ReasoningParser] = None): - self.reasoning_parser = reasoning_parser + if reasoning_parser: + self.reasoning_end_id = reasoning_parser.think_end_token_id def add_logits_processor( self, ids: int, future: Optional[Any] = None, prefill_tokens: List[int] = [], skip: bool = False @@ -112,13 +114,12 @@ def get_available_processors(self): """ get available logits processor """ - available_processors = None with self.logits_lock: for processor in self.logits_processor.values(): - if processor.is_terminated(): + if processor.is_terminated() or not isinstance(processor, LogitsProcessorBase): continue - available_processors = processor - return available_processors + return processor + return None def update_logits_processor(self): """update logits processor""" @@ -149,7 +150,6 @@ def update_vocab_mask(self, skip_idx_list: List[int] = []): if len(self.logits_processor) == 0: return - self.update_logits_processor() available_processors = self.get_available_processors() if available_processors is None: return @@ -157,7 +157,9 @@ def update_vocab_mask(self, skip_idx_list: List[int] = []): # allocate token bitmask token_bitmask = available_processors.allocate_token_bitmask() + self.update_logits_processor() with self.logits_lock: + # TODO: 支持并行 fill token bitmask # fill token bitmask for idx, processor in self.logits_processor.items(): if processor.is_terminated() or idx in skip_idx_list: @@ -179,25 +181,26 @@ def apply_token_mask(self, logits: paddle.Tensor, skip_idx_list: List[int] = []) if len(self.logits_processor) == 0: return logits - if self.async_step is not None: - self.async_step.result() - self.async_step = None - self.update_logits_processor() if self.insert_processor: + # TODO: 只更新插入位置的processor self.update_vocab_mask(skip_idx_list) with self.logits_lock: self.insert_processor = False + if self.async_step is not None: + self.async_step.result() + self.async_step = None + available_processors = self.get_available_processors() if available_processors is None or self.token_bitmask is None: return logits indices = [] for idx, processor in self.logits_processor.items(): - if processor is None or idx in skip_idx_list: + if processor is None or processor.is_terminated() or idx in skip_idx_list: continue - if self.reasoning_parser is None or not processor.enable_reasoning or processor.reasoning_ended: + if self.reasoning_end_id is None or not processor.enable_reasoning or processor.reasoning_ended: indices.append(idx) return available_processors.apply_token_mask(logits, self.token_bitmask, indices=indices) @@ -211,12 +214,12 @@ def _accept_token(self, idx: int, token: int): return if ( - self.reasoning_parser is not None + self.reasoning_end_id is not None and self.logits_processor[idx].enable_reasoning and not self.logits_processor[idx].reasoning_ended ): - reasoning_ended = self.reasoning_parser.is_reasoning_end([token]) - self.logits_processor[idx].reasoning_ended = reasoning_ended + # check reasoning end + self.logits_processor[idx].reasoning_ended = self.reasoning_end_id == token return self.logits_processor[idx].accept_token(token) @@ -236,10 +239,9 @@ def update_output_tokens( # create async operation for guided decoding def async_update(next_tokens, skip_idx_list, skip_list_next): - token_ids = next_tokens.numpy().tolist() with self.logits_lock: - for idx in self.logits_processor.keys(): - token = token_ids[idx][0] + for idx_tuple, token in np.ndenumerate(next_tokens.numpy()): + idx = idx_tuple[0] if token < 0 or self.logits_processor[idx] is None or idx in skip_idx_list: continue self._accept_token(idx, token) @@ -418,6 +420,7 @@ def forward_cuda( skip_idx_list: List[int] = [], ) -> SamplerOutput: """ """ + # guided decoding apply token mask for logits logits = self.processor.apply_token_mask(logits, skip_idx_list) num_logprobs = sampling_metadata.max_num_logprobs From 0d2588433df5d1fd57268be9da8687b62da2e8fd Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 12 Sep 2025 13:43:55 +0800 Subject: [PATCH 7/7] update code --- .../guided_decoding/base_guided_decoding.py | 16 +++++++------- .../guided_decoding/xgrammar_backend.py | 22 +++++++++++-------- fastdeploy/worker/gpu_model_runner.py | 2 ++ 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/fastdeploy/model_executor/guided_decoding/base_guided_decoding.py b/fastdeploy/model_executor/guided_decoding/base_guided_decoding.py index b9a879e32d..d6961c9e3a 100644 --- a/fastdeploy/model_executor/guided_decoding/base_guided_decoding.py +++ b/fastdeploy/model_executor/guided_decoding/base_guided_decoding.py @@ -231,7 +231,7 @@ def get_reasoning_parser(self): def _init_logits_processor( self, schemata_key: tuple[str, str], - enable_thinking: bool = False, + **kwargs, ) -> LogitsProcessorBase: """ init logits processor by type and schemata. @@ -248,13 +248,13 @@ def _init_logits_processor( """ key_type, schemata = schemata_key if key_type == "json": - return self._json_processor(schemata, enable_thinking) + return self._json_processor(schemata, **kwargs) elif key_type == "regex": - return self._regex_processor(schemata, enable_thinking) + return self._regex_processor(schemata, **kwargs) elif key_type == "grammar": - return self._grammar_processor(schemata, enable_thinking) + return self._grammar_processor(schemata, **kwargs) elif key_type == "structural_tag": - return self._structural_tag_processor(schemata, enable_thinking) + return self._structural_tag_processor(schemata, **kwargs) else: llm_logger.error(f"Unsupported processor type {key_type}.") return None @@ -262,7 +262,7 @@ def _init_logits_processor( def get_logits_processor( self, schemata_key: tuple[str, str], - enable_thinking: bool = False, + **kwargs, ) -> tuple[LogitsProcessorBase, bool]: """ get logits processor by key from cache or create new one. @@ -278,9 +278,9 @@ def get_logits_processor( value = self.cache.get(schemata_key, None) if value: value_copy = value.copy() - value_copy.enable_reasoning = enable_thinking + value_copy.enable_reasoning = kwargs.get("enable_thinking", False) return value_copy, True - value = self.executor.submit(self._init_logits_processor, schemata_key, enable_thinking) + value = self.executor.submit(self._init_logits_processor, schemata_key, **kwargs) return value, False def _get_tokenizer_hf(self): diff --git a/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py b/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py index 2b07cf04e8..949b10bcd2 100644 --- a/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py +++ b/fastdeploy/model_executor/guided_decoding/xgrammar_backend.py @@ -72,6 +72,7 @@ def __init__( vocab_size: Optional[int] = None, batch_size: Optional[int] = None, enable_thinking: bool = False, + request_id: Optional[str] = None, ): super().__init__(enable_reasoning=enable_thinking) self.max_rollback_tokens = 200 @@ -80,6 +81,7 @@ def __init__( self.compiled_grammar = compiled_grammar self.terminate_without_stop_token = terminate_without_stop_token self.override_stop_tokens = override_stop_tokens + self.request_id = request_id self.matcher = GrammarMatcher( compiled_grammar=compiled_grammar, @@ -157,7 +159,7 @@ def accept_token(self, token: int) -> None: token (int): The token ID to validate """ if not self.matcher.accept_token(token): - llm_logger.error(f"failed to accept token [{token}]") + llm_logger.error(f"request: {self.request_id} failed to accept token [{token}]") return False return True @@ -223,6 +225,7 @@ def _create_processor( terminate_without_stop_token: bool = False, override_stop_tokens: Optional[List[int]] = None, enable_thinking: bool = False, + request_id: Optional[str] = None, ) -> XGrammarProcessor: """ Create a logits processor instance for the given compiled grammar. @@ -243,9 +246,10 @@ def _create_processor( vocab_size=self.vocab_size, batch_size=self.batch_size, enable_thinking=enable_thinking, + request_id=request_id, ) - def _json_processor(self, schemata: str, enable_thinking: bool = False) -> Optional[XGrammarProcessor]: + def _json_processor(self, schemata: str, **kwargs) -> Optional[XGrammarProcessor]: """ Compile JSON schema into a grammar processor. @@ -261,9 +265,9 @@ def _json_processor(self, schemata: str, enable_thinking: bool = False) -> Optio except Exception as e: llm_logger.error(f"Failed to compile json schema: {e}, {str(traceback.format_exc())}") return None - return self._create_processor(compiled_grammar, enable_thinking=enable_thinking) + return self._create_processor(compiled_grammar, **kwargs) - def _regex_processor(self, schemata: str, enable_thinking: bool = False) -> Optional[XGrammarProcessor]: + def _regex_processor(self, schemata: str, **kwargs) -> Optional[XGrammarProcessor]: """ Compile regex pattern into a grammar processor. @@ -279,9 +283,9 @@ def _regex_processor(self, schemata: str, enable_thinking: bool = False) -> Opti except Exception as e: llm_logger.error(f"Failed to compile regex schema: {e}, {str(traceback.format_exc())}") return None - return self._create_processor(compiled_grammar, enable_thinking=enable_thinking) + return self._create_processor(compiled_grammar, **kwargs) - def _grammar_processor(self, schemata: str, enable_thinking: bool = False) -> Optional[XGrammarProcessor]: + def _grammar_processor(self, schemata: str, **kwargs) -> Optional[XGrammarProcessor]: """ Compile grammar (EBNF) into a grammar processor. @@ -297,9 +301,9 @@ def _grammar_processor(self, schemata: str, enable_thinking: bool = False) -> Op except Exception as e: llm_logger.error(f"Failed to compile ebnf schema: {e}, {str(traceback.format_exc())}") return None - return self._create_processor(compiled_grammar, enable_thinking=enable_thinking) + return self._create_processor(compiled_grammar, **kwargs) - def _structural_tag_processor(self, schemata: str, enable_thinking: bool = False) -> Optional[XGrammarProcessor]: + def _structural_tag_processor(self, schemata: str, **kwargs) -> Optional[XGrammarProcessor]: """ Compile structural tags into a grammar processor. @@ -324,7 +328,7 @@ def _structural_tag_processor(self, schemata: str, enable_thinking: bool = False except Exception as e: llm_logger.error(f"Failed to compile structural tags schema: {e}, {str(traceback.format_exc())}") return None - return self._create_processor(compiled_grammar, enable_thinking=enable_thinking) + return self._create_processor(compiled_grammar, **kwargs) class XGrammarChecker(BaseChecker): diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 8bb0239d53..0a469fadee 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -264,6 +264,8 @@ def _init_logits_processor(self, request): self.guided_backend.get_logits_processor( schemata_key=schemata_key, enable_thinking=enable_thinking, + override_stop_tokens=request.eos_token_ids, + request_id=request.request_id, ), schemata_key, )