Post

对数据进行预处理

对数据进行预处理

简单做个demo,对一批文本数据进行预处理,方便后续训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
""" 步骤 1:安装依赖库 """
# !pip install datasets transformers

""" 步骤 2:准备文件结构 """
# 假设原始数据存放结构:
# ./raw_data/
#   ├── part-0001.txt
#   ├── part-0002.txt
#   └── ...(数千个文本文件)

""" 步骤 3:创建 Dataset 对象 """
from datasets import load_dataset

# 使用通配符加载所有文本文件
raw_dataset = load_dataset(
    "text", 
    data_files="./raw_data/part-*.txt",  # 匹配所有part开头的文件
    split="train",  # 所有数据作为训练集
    sample_by="document",  # 按文档采样
    streaming=False  # 小数据集可直接加载内存
)

print(f"原始数据集示例:{raw_dataset[0]}")  # 查看第一条数据

""" 步骤 4:定义预处理函数 """
from transformers import AutoTokenizer

# 加载分词器(以GPT2为例)
tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer.add_special_tokens({"pad_token": "[PAD]"})  # 添加填充标记

# 定义批处理函数
def preprocess_function(examples):
    # 此处可添加自定义清洗逻辑:
    processed_texts = [
        text.strip().replace("\n", " ")  # 示例:去除首尾空格、替换换行符
        for text in examples["text"]
    ]
    
    # 批量分词
    tokenized_output = tokenizer(
        processed_texts,
        truncation=False,  # 不截断,后续做分块处理
        padding=False,      # 不填充
        return_attention_mask=False,
        return_token_type_ids=False
    )
    
    return tokenized_output

""" 步骤 5:执行预处理 """
# 关键参数说明:
# batched=True:启用批处理(提升效率)
# batch_size=1000:每批处理1000个样本
# num_proc=8:使用8个CPU进程并行处理
# remove_columns=["text"]:移除原始文本列
tokenized_dataset = raw_dataset.map(
    preprocess_function,
    batched=True,
    batch_size=1000,
    num_proc=8,  # 根据CPU核心数调整
    remove_columns=["text"],
    load_from_cache_file=True,  # 启用缓存避免重复处理
    desc="Tokenizing data"
)

""" 步骤 6:分块处理(适应模型上下文长度) """
block_size = 512  # 根据模型最大长度调整

def group_texts(examples):
    # 将所有token_ids拼接为长序列
    concatenated_ids = [i for ids in examples["input_ids"] for i in ids]
    
    # 计算总长度并分块
    total_length = len(concatenated_ids)
    if total_length >= block_size:
        total_length = (total_length // block_size) * block_size
        
    # 切割为block_size长度的块
    result = {
        "input_ids": [
            concatenated_ids[i : i + block_size]
            for i in range(0, total_length, block_size)
        ]
    }
    return result

final_dataset = tokenized_dataset.map(
    group_texts,
    batched=True,
    batch_size=1000,
    num_proc=8,
    desc="Grouping texts into chunks"
)

print(f"处理后的样本示例:{final_dataset[0]}")

""" 步骤 7:保存预处理结果 """
# 保存为Arrow格式(高效二进制格式)
final_dataset.save_to_disk("./processed_dataset")

# 保存为多个分片文件(适合超大数据集)
final_dataset.save_to_disk(
    "./processed_dataset_sharded",
    max_shard_size="1GB"  # 每个分片最大1GB
)

""" (可选)步骤8:流式处理版(适合内存不足的情况) """
streaming_dataset = load_dataset(
    "text",
    data_files="./raw_data/part-*.txt",
    split="train",
    streaming=True  # 启用流式处理
)

# 流式处理需要逐批处理
processed_stream = streaming_dataset.map(
    preprocess_function,
    batched=True,
    batch_size=1000
).map(
    group_texts,
    batched=True
)

# 流式保存示例
from datasets import DatasetDict
DatasetDict({"train": processed_stream}).save_to_disk(
    "./stream_processed",
    max_shard_size="1GB"
)
This post is licensed under CC BY 4.0 by the author.

Trending Tags