我想使用多线程写入数据,每个线程从给定字节开始写入给定大小的数据。我确定写入位置和块大小是正确的。当数据块按顺序到达时,文件的哈希匹配,但当数据块无序到达时,文件的顺序错误。我加了写锁,为什么还是这样?
客户:
import os
import requests
from concurrent.futures import ThreadPoolExecutor
import datetime
import threading
print_lock = threading.Lock()
file_write_lock = threading.Lock()
def safe_print(*args, **kwargs):
print_lock.acquire()
print(*args, **kwargs)
print_lock.release()
SERVER_URL = 'http://127.0.0.1:9090'
CHUNK_SIZE = 1024 * 128
MAX_WORKERS = 5
MAX_RETRIES = 3
def download_chunk(session, filesize, url, start_byte, file_path, filename):
get_size = CHUNK_SIZE if not start_byte + CHUNK_SIZE >= filesize else filesize - start_byte
response = session.get(url,
headers={'Range': f'bytes={start_byte}-{start_byte + get_size - 1}', 'X-Filename': filename})
if response.status_code != 200:
return response.text
chunk = response.content
safe_print(start_byte, len(chunk), get_size)
with file_write_lock:
with open(file_path, 'ab') as f:
f.seek(start_byte)
f.write(chunk)
f.flush()
os.fsync(f.fileno())
def download_file(file_path, filename):
if os.path.exists(file_path):
os.remove(file_path)
with requests.Session() as session:
url = f'{SERVER_URL}/download/'
response = session.head(url, headers={'X-Filename': filename})
if response.status_code == 404:
raise FileNotFoundError("File not found on server.")
file_size = int(response.headers.get('File-Size', 0))
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
results = [executor.submit(download_chunk, session, file_size, url, i, file_path, filename)
for i in range(0, file_size, CHUNK_SIZE)]
# for future in results:
# print(future.result())
if __name__ == '__main__':
starttime = datetime.datetime.now()
download_file('testfiles/download.txt', 'rest_api.txt')
endtime = datetime.datetime.now()
print("下载时间", endtime - starttime)
服务器:
from flask import Flask, request, Response, make_response
import os
app = Flask(__name__)
UPLOAD_FOLDER = 'uploads'
CHUNK_SIZE = 1024 * 128
@app.route('/download/', methods=['GET', 'HEAD'])
def download_file():
filename = request.headers.get('X-Filename', None)
if not filename:
return make_response(('Missing filename in request headers.', 400))
file_path = os.path.join(UPLOAD_FOLDER, filename)
if not os.path.exists(file_path):
return make_response(('File not found.', 404))
file_size = os.path.getsize(file_path)
if request.method == 'HEAD':
response = make_response()
response.headers.set('File-Size', file_size)
return response
bytes_range = request.headers.get('Range', 'bytes=0-').split('=')[1].split('-')
start_byte = int(bytes_range[0])
end_byte = int(bytes_range[1])
if end_byte > file_size:
return make_response(('End byte out of range.', 416))
with open(file_path, 'rb') as f:
f.seek(start_byte)
chunk = f.read(end_byte - start_byte + 1)
headers = {'Content-Disposition': f'attachment; filename={filename}',
'Content-Range': f'bytes {start_byte}-{end_byte}/{file_size}',
'Content-Type': 'application/octet-stream'}
return Response(chunk, headers=headers)
if __name__ == '__main__':
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
app.run(host='0.0.0.0', port=9090, debug=True)
我已经用文件进行了测试,并在按顺序写入数据块时得到了正确的文件。