我有一个轻量级的Python应用程序,它应该执行一个非常简单的任务,但由于OOM而不断崩溃。
.parquet
加载到数据框中stockstats
包计算指标.parquet
df = pd.merge(df, st, on=['datetime'])
3.10
pandas~=2.1.4
stockstats~=0.4.1
1.28.2-do.0
(在数字海洋中运行)这里有一个奇怪的事情,数据框非常小(
df.size
是208446
,文件大小是1.00337 MB
,内存使用量是1.85537 MB
)。
测量
import os
file_stats = os.stat(filename)
file_size = file_stats.st_size / (1024 * 1024) # 1.00337 MB
df_mem_usage = dataframe.memory_usage(deep=True)
df_mem_usage_print = round(df_mem_usage.sum() / (1024 * 1024), 6 # 1.85537 MB
df_size = dataframe.size # 208446
使用 Helm 将应用程序部署到 Kubernetes 中,并设置以下资源
resources:
limits:
cpu: 1000m
memory: 6000Mi
requests:
cpu: 1000m
memory: 4000Mi
我使用的是4vCPU + 8 GB内存的节点,并且该节点没有性能压力。
kubectl top node node-xxx
NAME CPU(cores) CPU% MEMORY(bytes) MEMORY%
node-xxx 750m 19% 1693Mi 25%
播客信息
kubectl describe pod xxx
...
State: Waiting
Reason: CrashLoopBackOff
Last State: Terminated
Reason: OOMKilled
Exit Code: 137
Started: Sun, 24 Mar 2024 16:08:56 +0000
Finished: Sun, 24 Mar 2024 16:09:06 +0000
...
这是 Grafana 的 CPU 和内存消耗。我知道非常短的内存或 CPU 峰值将很难看到,但从长远来看,该应用程序不会消耗大量 RAM。另一方面,根据我的经验,我们在 RAM 较少的容器上使用相同的
pandas
操作,并且数据帧要大得多,没有问题。
我应该如何解决这个问题? 为了防止 OOM,我还应该调试什么?
原始数据框(名为
df
)
datetime open high low close volume
0 2023-11-14 11:15:00 2.185 2.187 2.171 2.187 19897.847314
1 2023-11-14 11:20:00 2.186 2.191 2.183 2.184 8884.634728
2 2023-11-14 11:25:00 2.184 2.185 2.171 2.176 12106.153954
3 2023-11-14 11:30:00 2.176 2.176 2.158 2.171 22904.354082
4 2023-11-14 11:35:00 2.171 2.173 2.167 2.171 1691.211455
新数据框(名为
st
)。 trend_orientation = 1
=> st_lower = NaN
,如果 -1 => st_upper = NaN
datetime supertrend_ub supertrend_lb trend_orientation st_trend_segment
0 2023-11-14 11:15:00 0.21495 NaN -1 1
1 2023-11-14 11:20:00 0.21495 NaN -10 1
2 2023-11-14 11:25:00 0.21495 NaN -11 1
3 2023-11-14 11:30:00 0.21495 NaN -12 1
4 2023-11-14 11:35:00 0.21495 NaN -13 1
代码示例
import pandas as pd
import multiprocessing
import numpy as np
import stockstats
def add_supertrend(market):
try:
# Read data from file
df = pd.read_parquet(market, engine="fastparquet")
# Extract date columns
date_column = df['datetime']
# Convert to stockstats object
st_a = stockstats.wrap(df.copy())
# Generate supertrend
st_a = st_a[['supertrend', 'supertrend_ub', 'supertrend_lb']]
# Add back datetime columns
st_a.insert(0, "datetime", date_column)
# Add trend orientation using conditional columns
conditions = [
st_a['supertrend_ub'] == st_a['supertrend'],
st_a['supertrend_lb'] == st_a['supertrend']
]
values = [-1, 1]
st_a['trend_orientation'] = np.select(conditions, values)
# Remove not required supertrend values
st_a.loc[st_a['trend_orientation'] < 0, 'st_lower'] = np.NaN
st_a.loc[st_a['trend_orientation'] > 0, 'st_upper'] = np.NaN
# Unwrap back to dataframe
st = stockstats.unwrap(st_a)
# Ensure correct date types are used
st = st.astype({
'supertrend': 'float32',
'supertrend_ub': 'float32',
'supertrend_lb': 'float32',
'trend_orientation': 'int8'
})
# Add trend segments
st_to = st[['trend_orientation']]
st['st_trend_segment'] = st_to.ne(st_to.shift()).cumsum()
# Remove trend value
st.drop(columns=['supertrend'], inplace=True)
# Merge ST with DF
df = pd.merge(df, st, on=['datetime'])
# Write back to parquet
df.to_parquet(market, compression=None)
except Exception as e:
# Using proper logger in real code
print(e)
pass
def main():
# Using fixed market as example, in real code market is fetched
market = "BTCUSDT"
# Using multiprocessing to free up memory after each iteration
p = multiprocessing.Process(target=add_supertrend, args=(market,))
p.start()
p.join()
if __name__ == "__main__":
main()
Dockerfile
FROM python:3.10
ENV PYTHONFAULTHANDLER=1 \
PYTHONHASHSEED=random \
PYTHONUNBUFFERED=1 \
PYTHONPATH=.
# Adding vim
RUN ["apt-get", "update"]
# Get dependencies
COPY requirements.txt .
RUN pip3 install -r requirements.txt
# Copy main app
ADD . .
CMD main.py
您的 Python 应用程序在与 Pandas 进行合并操作期间遇到内存不足 (OOM) 崩溃。
+---------------+ +-------------+ +--------------+ +-------------------+
| Load .parquet | --> | Calculate | ---> | Merge Data | --X--> | Store .parquet |
| to DataFrame | | Indicator | | Frames | | (Crashes here) |
+---------------+ +-------------+ +--------------+ +-------------------+
即使数据帧相对较小,崩溃也表明合并操作期间发生了内存峰值。 Grafana 的 CPU 和内存使用统计数据表明,在正常操作下,资源使用在限制范围内,但合并操作可能会导致超出可用内存的峰值。
如果可能的话,您可以尝试通过将操作分成更小的块来减少合并期间的内存使用。并显式删除未使用的临时数据帧或变量以释放内存。
更一般地说,逐步监控内存使用情况,以准确确定峰值发生的时间。并实现日志记录以捕获进程的状态:
import pandas as pd
import multiprocessing
import numpy as np
import stockstats
import os
# Add a memory usage logger function:
def log_memory_usage(df, step):
mem = df.memory_usage(deep=True).sum() / (1024 * 1024) # in MB
print(f'Memory usage after {step}: {mem:.2f} MB')
def add_supertrend(market):
try:
df = pd.read_parquet(market, engine="fastparquet")
log_memory_usage(df, 'loading dataframe')
# Perform the rest of the operations as before
#
# After generating supertrend data
log_memory_usage(st, 'after generating supertrend')
# Before merge operation
log_memory_usage(df, 'before merge')
df = pd.merge(df, st, on=['datetime'])
# After merge operation
log_memory_usage(df, 'after merge')
# Save the result
df.to_parquet(market, compression=None)
except Exception as e:
print(e)
# main() and if __name__ == "__main__": block remains the same
通过在每个步骤添加日志记录,您可以检查控制台输出以查看崩溃发生前的内存使用情况。
正如 Lukasz Tracewski 在评论中建议的那样,健全性检查将确保 Kubernetes 环境和应用程序的配置实际上可以按预期处理内存分配。
您可以分配大量内存(5 GB)来查看 Kubernetes 环境是否按预期处理它。如果测试成功且没有发生 OOM 崩溃,则问题可能不在于 Kubernetes 配置,而可能在于 Python 应用程序本身如何处理内存,或者如何执行
pandas
合并操作。
创建一个名为
memory-stress-test.yaml
的文件并运行 kubectl apply -f memory-stress-test.yaml
。
apiVersion: v1
kind: Pod
metadata:
name: memory-stress-test
spec:
containers:
- name: memory-stress-test
image: polinux/stress
resources:
limits:
memory: "6000Mi"
requests:
memory: "5000Mi"
command: ["stress"]
args: ["--vm", "1", "--vm-bytes", "5G", "--vm-hang", "1"]
然后使用
kubectl get pod memory-stress-test
监控 Pod 的状态,并使用 kubectl describe pod memory-stress-test
查看任何事件。
如果环境通过了此测试,则问题可能出在代码或数据处理中,而不是容器或节点配置中。