为什么Python多线程写的时候chunk的顺序不对?

问题描述 投票:0回答:0

我想使用多线程写入数据,每个线程从给定字节开始写入给定大小的数据。我确定写入位置和块大小是正确的。当数据块按顺序到达时,文件的哈希匹配,但当数据块无序到达时,文件的顺序错误。我加了写锁,为什么还是这样?

客户:

import os
import requests
from concurrent.futures import ThreadPoolExecutor
import datetime
import threading

print_lock = threading.Lock()
file_write_lock = threading.Lock()


def safe_print(*args, **kwargs):
    print_lock.acquire()
    print(*args, **kwargs)

    print_lock.release()


SERVER_URL = 'http://127.0.0.1:9090'
CHUNK_SIZE = 1024 * 128
MAX_WORKERS = 5
MAX_RETRIES = 3


def download_chunk(session, filesize, url, start_byte, file_path, filename):
    get_size = CHUNK_SIZE if not start_byte + CHUNK_SIZE >= filesize else filesize - start_byte
    response = session.get(url,
                           headers={'Range': f'bytes={start_byte}-{start_byte + get_size - 1}', 'X-Filename': filename})
    if response.status_code != 200:
        return response.text
    chunk = response.content
    safe_print(start_byte, len(chunk), get_size)
    with file_write_lock:
        with open(file_path, 'ab') as f:
            f.seek(start_byte)
            f.write(chunk)
            f.flush()
            os.fsync(f.fileno())


def download_file(file_path, filename):
    if os.path.exists(file_path):
        os.remove(file_path)

    with requests.Session() as session:
        url = f'{SERVER_URL}/download/'
        response = session.head(url, headers={'X-Filename': filename})

        if response.status_code == 404:
            raise FileNotFoundError("File not found on server.")

        file_size = int(response.headers.get('File-Size', 0))

        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            results = [executor.submit(download_chunk, session, file_size, url, i, file_path, filename)
                       for i in range(0, file_size, CHUNK_SIZE)]

            # for future in results:
            #     print(future.result())


if __name__ == '__main__':
    starttime = datetime.datetime.now()
    download_file('testfiles/download.txt', 'rest_api.txt')
    endtime = datetime.datetime.now()
    print("下载时间", endtime - starttime)

服务器:

from flask import Flask, request, Response, make_response
import os

app = Flask(__name__)
UPLOAD_FOLDER = 'uploads'
CHUNK_SIZE = 1024 * 128


@app.route('/download/', methods=['GET', 'HEAD'])
def download_file():
    filename = request.headers.get('X-Filename', None)
    if not filename:
        return make_response(('Missing filename in request headers.', 400))

    file_path = os.path.join(UPLOAD_FOLDER, filename)

    if not os.path.exists(file_path):
        return make_response(('File not found.', 404))

    file_size = os.path.getsize(file_path)

    if request.method == 'HEAD':
        response = make_response()
        response.headers.set('File-Size', file_size)
        return response

    bytes_range = request.headers.get('Range', 'bytes=0-').split('=')[1].split('-')
    start_byte = int(bytes_range[0])
    end_byte = int(bytes_range[1])
    if end_byte > file_size:
        return make_response(('End byte out of range.', 416))

    with open(file_path, 'rb') as f:
        f.seek(start_byte)
        chunk = f.read(end_byte - start_byte + 1)

    headers = {'Content-Disposition': f'attachment; filename={filename}',
               'Content-Range': f'bytes {start_byte}-{end_byte}/{file_size}',
               'Content-Type': 'application/octet-stream'}
    return Response(chunk, headers=headers)


if __name__ == '__main__':
    if not os.path.exists(UPLOAD_FOLDER):
        os.makedirs(UPLOAD_FOLDER)
    app.run(host='0.0.0.0', port=9090, debug=True)

我已经用文件进行了测试,并在按顺序写入数据块时得到了正确的文件。

python io python-multithreading
© www.soinside.com 2019 - 2024. All rights reserved.