Python 快速读写处理超大文件

[TOC]

生成一个1000行的demo文件

1
2
3
with open("big_file_demo.txt", "w", encoding="utf-8") as fw:
for idx in range(1, 1001):
fw.write(str(idx) + "\n")

一般方法遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time

def process_text(text, fw):
time.sleep(0.01)
new_text = "{} done ...\n".format(text.strip())
fw.write(new_text)

def read_write(frname, fwname):
start_time = time.time()

with open(frname, "r", encoding="utf-8") as fr, \
open(fwname, "w", encoding="utf-8") as fw:

for line in fr:
process_text(line, fw)

print(time.time() - start_time)

read_write("./big_file_demo.txt", "./big_file_demo_1.txt")

# 输出
18.075376510620117

线程池方法处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from concurrent import futures
import time
import os

print("cpu 数量:", os.cpu_count()) # 测试笔记本是8核

# 每次只读batch_size → 生成器
def read_big_file(frname, batch_size):
with open(frname, "r", encoding="utf-8") as fr:
data = []
for line in fr:
data.append(line)
if len(data) == batch_size:
yield data
data = []
if len(data) > 0:
yield data

# 处理单行文件
def process_text(text):
time.sleep(0.01)
return "{} done ...\n".format(text.strip())

# 多线程遍历List
def batch_multi_thread(data_list):
with futures.ThreadPoolExecutor() as tp:
task_list = [tp.submit(process_text, txt) for txt in data_list]
task_result = [t.result() for t in task_list] # 有序结果
# task_result = [t.result() for t in futures.as_completed(task_list)] # 无序结果
return task_result

def read_write(frname, fwname, batch_size):
start_time = time.time()

with open(fwname, "w", encoding="utf-8") as fw:
for data_list in read_big_file(frname, batch_size):
task_result = batch_multi_thread(data_list)
fw.writelines(task_result)

print(time.time() - start_time)

read_write("./big_file_demo.txt", "./big_file_demo_done_mt.txt", 100)

输出
cpu 数量: 8
1.789292573928833
坚持原创技术分享,您的支持将鼓励我继续创作!
0%