Skip to content

[Feature Request]:为分词函数 chunking_by_token_size 添加递归字符分割的功能 #2126

@QiSi-KeGuan

Description

@QiSi-KeGuan

Do you need to file a feature request?

  • I have searched the existing feature request and this feature request is not already filed.
  • I believe this is a legitimate feature request, not just a question or bug.

Feature Request Description

该函数采用分层策略将文本内容切分为多个块(chunk),旨在尽可能保留文本的语义边界。

切分策略如下:

  1. 尝试使用一个预设的分隔符列表(例如,段落、换行符、句子结束符)将文本分割成较大的、具有语义连贯性的单元。
  2. 然后,遍历每一个切分出的单元:
  • 如果单元的 token 数量在 chunk_token_size 限制之内,则将其作为一个完整的块保留。
  • 如果单元的 token 数量超过了 chunk_token_size,则会对此特定单元应用一个基于 token 的“滑动窗口”机制。这个机制可以确保最终的块不会超长,并通过重叠(overlap)来维持上下文的连续性。
  • 同时防止因切断多字节字符(如汉字)而导致的解码错误。

文件路径
./LightRAG/lightrag/operate.py

`
def chunking_by_token_size(
tokenizer: Tokenizer,
content: str,
split_by_character: Optional[str] = None,
split_by_character_only: bool = False,
chunk_overlap_token_size: int = 100,
chunk_token_size: int = 1200,
) -> List[Dict[str, Any]]:
"""
Splits the content into chunks using a hierarchical approach to preserve semantic boundaries.

The strategy is as follows:
1.  First, attempt to split the text into larger, semantically meaningful blocks using a list of separators
    (e.g., paragraphs, newlines, sentences).
2.  For each block:
    - If the block is within the `chunk_token_size`, it is kept as a whole chunk.
    - If the block exceeds the `chunk_token_size`, a token-based sliding window is applied to that specific block,
      ensuring that no chunk exceeds the size limit while maintaining context through overlap. This sliding window
      is implemented safely to prevent decoding errors with multi-byte characters.

Args:
    tokenizer: The tokenizer for encoding and decoding text.
    content: The text content to be chunked.
    split_by_character: If provided, this character is used as the primary separator.
                        Otherwise, a default list of separators is used.
    split_by_character_only: (Not used in this implementation) Kept for signature compatibility.
    chunk_overlap_token_size: The number of overlapping tokens for the sliding window fallback.
    chunk_token_size: The maximum number of tokens per chunk.

Returns:
    A list of dictionaries, each representing a chunk.

Args:
    tokenizer: 用于文本编码和解码的分词器。
    content: 需要被切分的文本内容。
    split_by_character: 如果提供,将使用此字符作为最高优先级的分割符。
                        否则,将使用一个默认的分割符列表。
    split_by_character_only: (在此实现中未使用) 保留以兼容旧版函数签名。
    chunk_overlap_token_size: 用于滑动窗口回退机制的 token 重叠数量。
    chunk_token_size: 每个块的最大 token 数量。

Returns:
    一个由字典组成的列表,每个字典代表一个切分后的文本块。
"""
# 如果内容为空或只包含空白字符,则直接返回空列表
if not content or not content.strip():
    return []

# 确保块大小严格大于重叠大小,否则切分逻辑无法正常工作
if chunk_token_size <= chunk_overlap_token_size:
    raise ValueError(
        f"chunk_token_size ({chunk_token_size}) must be greater than chunk_overlap_token_size ({chunk_overlap_token_size})"
    )

# 1. Split text into semantic units based on separators
# 步骤 1: 根据分隔符将文本切分为语义单元
if split_by_character:
    separators = [split_by_character]
else:
    # Default hierarchical separators
    # 否则,使用一个预定义的、从粗到细的分隔符层级列表
    # 这个顺序很重要,优先尝试在最大的语义边界(段落)上进行切分
    separators = ["\n\n\n\n", "\n\n\n",  "\n\n", "\n", "。", "!", "?", ". ", "!", "?"]

# Start with the full content as the first unit to process
# 初始时,将全部内容作为一个待处理的单元
semantic_units = [content]

# 逐层遍历分隔符,对过长的单元进行细化切分
for sep in separators:
    next_level_units = []
    for unit in semantic_units:
        # 检查当前单元是否仍然超过了 token 大小限制
        # 优化:仅在必要时(即单元可能过长时)进行一次 tokenization
        if len(tokenizer.encode(unit)) > chunk_token_size:
            # If the unit is still too large, split it by the current separator and re-merge intelligently
            # 如果单元过长,则使用当前层级的 `sep` 分隔符进行切分,并智能地重新合并
            # 使用 re.split 并捕获分隔符 `(sep)`,是为了在结果中保留分隔符,确保语义完整
            sub_units = re.split(f"({re.escape(sep)})", unit)
            
            # *** 核心优化点 ***
            # 预先计算所有子单元的 token 数量,避免在循环中反复编码
            sub_units_with_tokens = [(s, len(tokenizer.encode(s))) for s in sub_units]

            temp_unit_text = ""
            temp_unit_tokens = 0
            # 每次处理两个子单元(文本 + 分隔符)
            for i in range(0, len(sub_units_with_tokens), 2):
                part_text, part_tokens = sub_units_with_tokens[i]
                
                if i + 1 < len(sub_units_with_tokens):
                    # 将分隔符附加回文本部分
                    sep_text, sep_tokens = sub_units_with_tokens[i+1]
                    part_text += sep_text
                    part_tokens += sep_tokens

                # 关键优化:用整数加法代替字符串拼接和重编码来检查长度
                if temp_unit_tokens + part_tokens > chunk_token_size:
                    # 如果合并后超长,就将之前积累的临时单元存入下一层级
                    if temp_unit_text:
                        next_level_units.append(temp_unit_text)
                    # 并将当前部分作为新的临时单元的开始
                    temp_unit_text = part_text
                    temp_unit_tokens = part_tokens
                else:
                    # 如果未超长,则继续合并
                    temp_unit_text += part_text
                    temp_unit_tokens += part_tokens
            
            # 将最后一个处理完毕的临时单元加入列表
            if temp_unit_text:
                next_level_units.append(temp_unit_text)
        else:
            # If the unit is small enough, keep it as is for the next level
            # 如果单元的长度已经符合要求,则无需再切分,直接进入下一层级
            next_level_units.append(unit)
    # 用更细粒度的单元列表替换上一层级的列表
    semantic_units = next_level_units

# 2. Create final chunks, format results, and handle overlap
# 步骤 2: 生成最终块、格式化结果并处理重叠
# 优化:将最终块生成和格式化合并,避免创建中间列表和额外的循环
results = []
chunk_index_counter = 0
for unit in semantic_units:
    unit_tokens = tokenizer.encode(unit)
    unit_token_len = len(unit_tokens)

    # 如果单元的 token 数量在限制内,直接作为一个最终块
    if unit_token_len <= chunk_token_size:
        stripped_text = unit.strip()
        if stripped_text:
            results.append(
                {
                    "content": stripped_text,
                    # 优化:此处直接使用 unit_token_len,但 strip() 可能改变长度,
                    # 为保持与原逻辑完全一致的精确性,对 strip() 后的文本重新编码。
                    "tokens": len(tokenizer.encode(stripped_text)),
                    "chunk_order_index": chunk_index_counter,
                }
            )
            chunk_index_counter += 1
    else:
        # Fallback for units that are still too large after all semantic splits
        # 回退机制:对于经过所有语义切分后仍然过长的单元(例如,一个没有标点的长段落)
        # 采用滑动窗口进行强制切分
        step = chunk_token_size - chunk_overlap_token_size
        for i in range(0, unit_token_len, step):
            token_slice = unit_tokens[i : i + chunk_token_size]
            
            # Robust decoding to prevent errors with multi-byte characters
            # 为了防止在多字节字符(如汉字)中间截断导致乱码,进行健壮的解码
            if hasattr(tokenizer, "tiktoken_encoding"):
                # 如果是 tiktoken 分词器,使用其提供的更安全的字节级解码方法
                byte_array = tokenizer.tiktoken_encoding.decode_tokens_bytes(
                    token_slice
                )
                # 从字节安全地解码为字符串,忽略任何潜在的错误
                chunk_text = byte_array.decode("utf-8", errors="ignore")
            else:
                # Fallback for other tokenizers that might not have this issue
                # 对于其他分词器,使用标准的解码方法
                chunk_text = tokenizer.decode(token_slice)

            stripped_text = chunk_text.strip()
            if stripped_text:
                results.append(
                    {
                        "content": stripped_text,
                        "tokens": len(tokenizer.encode(stripped_text)),
                        "chunk_order_index": chunk_index_counter,
                    }
                )
                chunk_index_counter += 1

return results

`

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requesttrackedIssue is tracked by project

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions