Python 应用程序在 Pandas 合并时导致 OOM 崩溃

问题描述 投票:0回答:1

我有一个轻量级的Python应用程序,它应该执行一个非常简单的任务,但由于OOM而不断崩溃。

应用程序应该做什么

  1. 将数据从
    .parquet
    加载到数据框中
  2. 使用
    stockstats
    包计算指标
  3. 将新计算的数据合并到原始数据框中 -> 这里崩溃了
  4. 将数据帧存储为
    .parquet

崩溃在哪里

df = pd.merge(df, st, on=['datetime'])

使用

  • Python
    3.10
  • pandas~=2.1.4
  • stockstats~=0.4.1
  • Kubernetes
    1.28.2-do.0
    (在数字海洋中运行)

这里有一个奇怪的事情,数据框非常小(

df.size
208446
,文件大小是
1.00337 MB
,内存使用量是
1.85537 MB
)。

测量

import os

file_stats = os.stat(filename)
file_size = file_stats.st_size / (1024 * 1024)  # 1.00337 MB

df_mem_usage = dataframe.memory_usage(deep=True)
df_mem_usage_print = round(df_mem_usage.sum() / (1024 * 1024), 6   # 1.85537 MB

df_size = dataframe.size  # 208446

部署信息

使用 Helm 将应用程序部署到 Kubernetes 中,并设置以下资源

resources:
  limits:
    cpu: 1000m
    memory: 6000Mi
  requests:
    cpu: 1000m
    memory: 4000Mi

我使用的是4vCPU + 8 GB内存的节点,并且该节点没有性能压力。

kubectl top node node-xxx
NAME              CPU(cores)   CPU%   MEMORY(bytes)   MEMORY%
node-xxx          750m         19%    1693Mi          25%

播客信息

kubectl describe pod xxx
...
    State:          Waiting
      Reason:       CrashLoopBackOff
    Last State:     Terminated
      Reason:       OOMKilled
      Exit Code:    137
      Started:      Sun, 24 Mar 2024 16:08:56 +0000
      Finished:     Sun, 24 Mar 2024 16:09:06 +0000
...

这是 Grafana 的 CPU 和内存消耗。我知道非常短的内存或 CPU 峰值将很难看到,但从长远来看,该应用程序不会消耗大量 RAM。另一方面,根据我的经验,我们在 RAM 较少的容器上使用相同的

pandas
操作,并且数据帧要大得多,没有问题。

我应该如何解决这个问题? 为了防止 OOM,我还应该调试什么?

数据和代码示例

原始数据框(名为

df

              datetime   open   high    low  close        volume
0  2023-11-14 11:15:00  2.185  2.187  2.171  2.187  19897.847314
1  2023-11-14 11:20:00  2.186  2.191  2.183  2.184   8884.634728
2  2023-11-14 11:25:00  2.184  2.185  2.171  2.176  12106.153954
3  2023-11-14 11:30:00  2.176  2.176  2.158  2.171  22904.354082
4  2023-11-14 11:35:00  2.171  2.173  2.167  2.171   1691.211455

新数据框(名为

st
)。
注:如果
trend_orientation = 1
=>
st_lower = NaN
,如果
-1 => st_upper = NaN

              datetime   supertrend_ub  supertrend_lb    trend_orientation    st_trend_segment
0  2023-11-14 11:15:00   0.21495        NaN              -1                   1
1  2023-11-14 11:20:00   0.21495        NaN              -10                  1
2  2023-11-14 11:25:00   0.21495        NaN              -11                  1
3  2023-11-14 11:30:00   0.21495        NaN              -12                  1
4  2023-11-14 11:35:00   0.21495        NaN              -13                  1

代码示例

import pandas as pd
import multiprocessing
import numpy as np
import stockstats


def add_supertrend(market):
    try:
        # Read data from file
        df = pd.read_parquet(market, engine="fastparquet")

        # Extract date columns
        date_column = df['datetime']

        # Convert to stockstats object
        st_a = stockstats.wrap(df.copy())
        # Generate supertrend
        st_a = st_a[['supertrend', 'supertrend_ub', 'supertrend_lb']]

        # Add back datetime columns
        st_a.insert(0, "datetime", date_column)

        # Add trend orientation using conditional columns
        conditions = [
            st_a['supertrend_ub'] == st_a['supertrend'],
            st_a['supertrend_lb'] == st_a['supertrend']
        ]
        
        values = [-1, 1]
        st_a['trend_orientation'] = np.select(conditions, values)

        # Remove not required supertrend values
        st_a.loc[st_a['trend_orientation'] < 0, 'st_lower'] = np.NaN
        st_a.loc[st_a['trend_orientation'] > 0, 'st_upper'] = np.NaN

        # Unwrap back to dataframe
        st = stockstats.unwrap(st_a)

        # Ensure correct date types are used
        st = st.astype({
            'supertrend': 'float32',
            'supertrend_ub': 'float32',
            'supertrend_lb': 'float32',
            'trend_orientation': 'int8'
        })
        # Add trend segments
        st_to = st[['trend_orientation']]
        st['st_trend_segment'] = st_to.ne(st_to.shift()).cumsum()
        
        # Remove trend value
        st.drop(columns=['supertrend'], inplace=True)

        # Merge ST with DF
        df = pd.merge(df, st, on=['datetime'])
        
        # Write back to parquet
        df.to_parquet(market, compression=None)
    except Exception as e:
        # Using proper logger in real code
        print(e)
        pass


def main():
    # Using fixed market as example, in real code market is fetched
    market = "BTCUSDT"
    # Using multiprocessing to free up memory after each iteration
    p = multiprocessing.Process(target=add_supertrend, args=(market,))
    p.start()
    p.join()


if __name__ == "__main__":
    main()

Dockerfile

FROM python:3.10

ENV PYTHONFAULTHANDLER=1 \
    PYTHONHASHSEED=random \
    PYTHONUNBUFFERED=1 \
    PYTHONPATH=.

# Adding vim
RUN ["apt-get", "update"]

# Get dependencies
COPY requirements.txt .
RUN pip3 install -r requirements.txt

# Copy main app
ADD . .
CMD main.py
python python-3.x pandas docker out-of-memory
1个回答
0
投票

您的 Python 应用程序在与 Pandas 进行合并操作期间遇到内存不足 (OOM) 崩溃。

+---------------+       +-------------+        +--------------+          +-------------------+
| Load .parquet |  -->  | Calculate   |  --->  | Merge Data   |  --X-->  | Store .parquet    |
|  to DataFrame |       | Indicator   |        | Frames       |          | (Crashes here)    |
+---------------+       +-------------+        +--------------+          +-------------------+

即使数据帧相对较小,崩溃也表明合并操作期间发生了内存峰值。 Grafana 的 CPU 和内存使用统计数据表明,在正常操作下,资源使用在限制范围内,但合并操作可能会导致超出可用内存的峰值。

如果可能的话,您可以尝试通过将操作分成更小的块来减少合并期间的内存使用。并显式删除未使用的临时数据帧或变量以释放内存。

更一般地说,逐步监控内存使用情况,以准确确定峰值发生的时间。并实现日志记录以捕获进程的状态:

import pandas as pd
import multiprocessing
import numpy as np
import stockstats
import os

# Add a memory usage logger function:
def log_memory_usage(df, step):
    mem = df.memory_usage(deep=True).sum() / (1024 * 1024)  # in MB
    print(f'Memory usage after {step}: {mem:.2f} MB')

def add_supertrend(market):
    try:
        df = pd.read_parquet(market, engine="fastparquet")
        log_memory_usage(df, 'loading dataframe')

        # Perform the rest of the operations as before
        # 

        # After generating supertrend data
        log_memory_usage(st, 'after generating supertrend')

        # Before merge operation
        log_memory_usage(df, 'before merge')

        df = pd.merge(df, st, on=['datetime'])

        # After merge operation
        log_memory_usage(df, 'after merge')

        # Save the result
        df.to_parquet(market, compression=None)

    except Exception as e:
        print(e)

# main() and if __name__ == "__main__": block remains the same

通过在每个步骤添加日志记录,您可以检查控制台输出以查看崩溃发生前的内存使用情况。


正如 Lukasz Tracewski评论中建议的那样,健全性检查将确保 Kubernetes 环境和应用程序的配置实际上可以按预期处理内存分配。

您可以分配大量内存(5 GB)来查看 Kubernetes 环境是否按预期处理它。如果测试成功且没有发生 OOM 崩溃,则问题可能不在于 Kubernetes 配置,而可能在于 Python 应用程序本身如何处理内存,或者如何执行

pandas
合并操作。

创建一个名为

memory-stress-test.yaml
的文件并运行
kubectl apply -f memory-stress-test.yaml

apiVersion: v1
kind: Pod
metadata:
  name: memory-stress-test
spec:
  containers:
  - name: memory-stress-test
    image: polinux/stress
    resources:
      limits:
        memory: "6000Mi"
      requests:
        memory: "5000Mi"
    command: ["stress"]
    args: ["--vm", "1", "--vm-bytes", "5G", "--vm-hang", "1"]

然后使用

kubectl get pod memory-stress-test
监控 Pod 的状态,并使用
kubectl describe pod memory-stress-test
查看任何事件。

如果环境通过了此测试,则问题可能出在代码或数据处理中,而不是容器或节点配置中。

© www.soinside.com 2019 - 2024. All rights reserved.