无法修复生成器内存泄漏,尽管知道它来自哪里

问题描述 投票:0回答:1

除了可在此处下载的 zip 文件之外,

https://data.binance.vision/?prefix=data/futures/um/daily/aggTrades/BTCUSDT/

这是一个最小的可重现示例。首先,下载 3 个或更多 zip 文件并获取每个 zip 文件的文件路径。然后在下面脚本的底部,将

file_list
中的文件路径替换为您的文件路径:

from zipfile import ZipFile
import numpy as np
import pandas as pd
import psutil
import re

def print_memory_usage(msg):
    # Get memory usage information
    memory_info = psutil.virtual_memory()
    
    # Print memory usage details
    print(msg)
    print(f"Used: {memory_info.used / (1024 ** 3):.2f} GB")

def batch_generator(files: list):

    def _create_batch(file):
        return _read_file(file)

    for batch in files:
        df = _create_batch(batch)
        yield df

def _read_file(file):
    with ZipFile(file) as zipfile:
        csv_filename = re.split(r'/', file)[-1][:-4] + ".csv"
        with zipfile.open(csv_filename) as f:
            try:
                return read_aggtrades(f)
            except Exception as e:
                print(e)
                raise Exception(f"Error occurred reading file: {csv_filename}")

def read_aggtrades(file) -> pd.DataFrame:

    # EX: 578304464,17085,0.01449000,684164672,684164672,14,True,True
    columns = ['a', 'price', 'q', 'first_trade_id', 'last_trade_id', 't', 'was_the_buyer_maker']
    usecols = ['a', 'price', 'q', 't']
    dtype = {'a': np.int64, 'price': str, 'q': str, 't': np.int64}

    def peek_line(f):
        pos = f.tell()
        line = f.readline()
        f.seek(pos)

        # Convert bytes to str (line can be bytes or str)
        if type(line) == bytes:
            return line.decode()

        return line

    def _read_csv(f1):
        # 99.9% files don't have headers, some do. Discard it if we encounter it by reading header line.
        first_line = peek_line(f1)
        if first_line.startswith('agg_trade_id'):
            f1.readline()
        return pd.read_csv(f1,
                         sep=',',
                         header=None,
                         names=columns,
                         usecols=usecols,
                         dtype=dtype)
    print_memory_usage("Before _read_csv")
    df = _read_csv(file)
    print_memory_usage("After _read_csv")

    return df

file_list = [
    "/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2019-12-31.zip", \
    "/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2020-01-01.zip", \
    "/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2020-01-02.zip", \
    "/home/owner/Desktop/BTCUSDT/BTCUSDT-aggTrades-2020-01-03.zip"]
generator = batch_generator(file_list)

i = 0
for batch in generator:
    i += 1
    print(i)

正如您在运行时所看到的,输出显示已用内存持续增加:

Before _read_csv
Used: 10.31 GB
After _read_csv
Used: 10.31 GB
1
Before _read_csv
Used: 10.31 GB
After _read_csv
Used: 10.32 GB
2
Before _read_csv
Used: 10.32 GB
After _read_csv
Used: 10.33 GB
3
Before _read_csv
Used: 10.33 GB
After _read_csv
Used: 10.35 GB
4

我不知道该怎么办。我尝试在 print 语句后添加

gc.collect()
del(batch)
,但这没有帮助。

python pandas memory-leaks zip
1个回答
0
投票

使用不同的策略来收集数据似乎有所帮助。

这里没有内存泄漏的迹象。

函数 print_memory_usage()read_aggtrades() 是 OP 的精确副本。

import zipfile
from pathlib import Path
import psutil
import pandas as pd
import numpy as np

def print_memory_usage(msg):
    # Get memory usage information
    memory_info = psutil.virtual_memory()
    
    # Print memory usage details
    print(msg)
    print(f"Used: {memory_info.used / (1024 ** 3):.2f} GB")

def read_aggtrades(file) -> pd.DataFrame:

    # EX: 578304464,17085,0.01449000,684164672,684164672,14,True,True
    columns = ['a', 'price', 'q', 'first_trade_id', 'last_trade_id', 't', 'was_the_buyer_maker']
    usecols = ['a', 'price', 'q', 't']
    dtype = {'a': np.int64, 'price': str, 'q': str, 't': np.int64}

    def peek_line(f):
        pos = f.tell()
        line = f.readline()
        f.seek(pos)

        # Convert bytes to str (line can be bytes or str)
        if type(line) == bytes:
            return line.decode()

        return line

    def _read_csv(f1):
        # 99.9% files don't have headers, some do. Discard it if we encounter it by reading header line.
        first_line = peek_line(f1)
        if first_line.startswith('agg_trade_id'):
            f1.readline()
        return pd.read_csv(f1,
                         sep=',',
                         header=None,
                         names=columns,
                         usecols=usecols,
                         dtype=dtype)
    print_memory_usage("Before _read_csv")
    df = _read_csv(file)
    print_memory_usage("After _read_csv")

    return df

DOWNLOADS = Path("/Users/CtrlZ/Downloads")
ZIPS = "BTCUSDT-aggTrades-2024-03-23.zip", "BTCUSDT-aggTrades-2024-03-22.zip", "BTCUSDT-aggTrades-2024-03-21.zip"

for file in ZIPS:
    with zipfile.ZipFile(DOWNLOADS / file) as z:
        z.extract(z.filelist[0], DOWNLOADS)
        with open(DOWNLOADS / z.filelist[0].filename) as data:
            read_aggtrades(data)

输出:

Before _read_csv
Used: 3.96 GB
After _read_csv
Used: 3.94 GB
Before _read_csv
Used: 3.94 GB
After _read_csv
Used: 3.96 GB
Before _read_csv
Used: 3.91 GB
After _read_csv
Used: 3.82 GB

平台:

macOS 14.4
Python 3.12.2
© www.soinside.com 2019 - 2024. All rights reserved.