此代码在Jupyter笔记本中对我有用,但是当我从命令行(Centos 7)运行警告消息时,未能拦截警告消息:
import io
import logging
from contextlib import redirect_stderr
import dask
import dask.dataframe as dd
def vc_delayed( df_file ):
f = io.StringIO()
with redirect_stderr(f):
ddf = dd.read_csv( df_file, compression='lz4', dtype=str, blocksize=None,
error_bad_lines=False, warn_bad_lines=True )
if f.getvalue():
logging.warning( f'For {df_file}: {f.getvalue()}' )
ddf = ddf[ cols ].map_partitions(pd.DataFrame.replace, {np.NaN: "_NULL_"})
return ddf.map_partitions( vc_func ).to_delayed()
res = map( vc_delayed, myfiles )
out = dask.compute( *res, scheduler='processes' )
哪个会产生此Jupyter笔记本输出(预期):
WARNING:root:For /data/user/datafu/broker_countz/further_split/further_split_aba.csv.lz4: b'Skipping line 5: expected 354 fields, saw 355\n'
[########################################] | 100% Completed | 4.5s
不期望的是,为什么它无法将警告输出为从命令行执行的Python 3脚本。弹出“原始”熊猫警告消息,提示预期的字段数,但是stderr重定向似乎失败,因为不存在修改。
Dask在这里没有任何幻想。它只是在不同的线程中运行您的代码。
我的第一个猜测是环境不匹配。也许在Jupyter中检查sys.executable
并在命令行中检查which python
?