使用argparse和current.futures程序冻结程序

问题描述 投票:0回答:1

我正在编写一个python程序来处理HPC bash终端上的NGS测序数据。该程序通常使用单个进程或多个进程在Mac上的jupyter Notebook上运行。但是,只要我尝试使用argpase软件包在终端中传递参数即可。该程序不会给我最终结果,而是会不确定地运行,就好像该过程尚未完成一样。我检查了一下,几乎可以肯定这是由于argpase和parallel.futures.ProcessPoolExecutor()之间的某些冲突引起的。谢谢!

以下代码在终端上产生冻结问题。

#! /usr/bin/env python

import pandas as pd
import time
import concurrent.futures
import argparse


def run(args):
    start = time.perf_counter()
    input_file = args.input
    output_file = args.output
    chunk = args.chunk_size

    def cal_breaking(data):
        for index, row in data.iterrows():
            if row[1] == 0:  # mapping to the foward strand
                data.at[index, 'breaking_pos'] = int(row[5]) + int(row[3])
            elif row[1] == 16:  # mapping to the reverse strand
                data.at[index, 'breaking_pos'] = int(row[3])
            else:
                pass
        return data

    new_df = pd.DataFrame(
        columns=['QNAME', 'FLAG', 'RNAME', 'POS', 'MAPQ', 'CIGAR', 'RNEXT', 'PNEXT', 'TLEN', 'SEQ', 'QUAL'])
    processes = []
    for df in pd.read_csv(input_file, delimiter='\t', usecols=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], chunksize=chunk):
        df.columns = ['QNAME', 'FLAG', 'RNAME', 'POS', 'MAPQ', 'CIGAR', 'RNEXT', 'PNEXT', 'TLEN', 'SEQ', 'QUAL']
        df = df.loc[~df['CIGAR'].str.contains('S') & ~df['CIGAR'].str.contains(
            'H')]  # filtered out those read that contains 'soft clip' and 'hard clip' sequences
        df['CIGAR'] = df.iloc[:, 5].str.extract(
            r'(\d+)')  # -d+ regex expression representing one or more numbers(0-9)
        df['breaking_pos'] = None
        with concurrent.futures.ProcessPoolExecutor() as executor:
            processes.append(executor.submit(cal_breaking, df))
    for process in processes:
        new_df = pd.concat([new_df, process.result()], sort=True)

    new_df['count'] = 1
    new_df = new_df.groupby(['RNAME', 'breaking_pos']).count()['count'].reset_index()
    new_df['end'] = new_df['breaking_pos'] + 1
    new_df = new_df[['RNAME', 'breaking_pos', 'end', 'count']]
    new_df.to_csv(output_file, '\t', index=None, header=None)
    end = time.perf_counter()
    print(f'process finished in {round(end - start, 2)} second(s)')


def main():
    parser = argparse.ArgumentParser(description="tagging HiC-Pro pair's sub-compartment")
    parser.add_argument("-in", help="input pairs file", dest="input", type=str, required=True)
    parser.add_argument("-out", help="output files name", dest="output", type=str, required=True)
    parser.add_argument("-ck", help="read in chunk size", dest="chunk_size", type=int, required=True)
    parser.set_defaults(func=run)
    args = parser.parse_args()
    args.func(args)


if __name__ == "__main__":
    main()

以下代码在终端上运行良好,如果我不使用多重处理,则不会出现问题:

#! /usr/bin/env python

import pandas as pd
import time
import argparse


def run(args):
    start = time.perf_counter()
    input_file = args.input
    output_file = args.output
    chunk = args.chunk_size

    def cal_breaking(data):
        for index, row in data.iterrows():
            if row[1] == 0:  # mapping to the foward strand
                data.at[index, 'breaking_pos'] = int(row[5]) + int(row[3])
            elif row[1] == 16:  # mapping to the reverse strand
                data.at[index, 'breaking_pos'] = int(row[3])
            else:
                pass
        return data

    new_df = pd.DataFrame(
        columns=['QNAME', 'FLAG', 'RNAME', 'POS', 'MAPQ', 'CIGAR', 'RNEXT', 'PNEXT', 'TLEN', 'SEQ', 'QUAL'])

    for df in pd.read_csv(input_file, delimiter='\t', usecols=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], chunksize=chunk):
        df.columns = ['QNAME', 'FLAG', 'RNAME', 'POS', 'MAPQ', 'CIGAR', 'RNEXT', 'PNEXT', 'TLEN', 'SEQ', 'QUAL']
        df = df.loc[~df['CIGAR'].str.contains('S') & ~df['CIGAR'].str.contains(
            'H')]  # filtered out those read that contains 'soft clip' and 'hard clip' sequences
        df['CIGAR'] = df.iloc[:, 5].str.extract(
            r'(\d+)')  # -d+ regex expression representing one or more numbers(0-9)
        df['breaking_pos'] = None
        new_df = pd.concat([new_df, cal_breaking(df)], sort=True)

    new_df['count'] = 1
    new_df = new_df.groupby(['RNAME', 'breaking_pos']).count()['count'].reset_index()
    new_df['end'] = new_df['breaking_pos'] + 1
    new_df = new_df[['RNAME', 'breaking_pos', 'end', 'count']]
    new_df.to_csv(output_file, '\t', index=None, header=None)
    end = time.perf_counter()
    print(f'process finished in {round(end - start, 2)} second(s)')


def main():
    parser = argparse.ArgumentParser(description="tagging HiC-Pro pair's sub-compartment")
    parser.add_argument("-in", help="input pairs file", dest="input", type=str, required=True)
    parser.add_argument("-out", help="output files name", dest="output", type=str, required=True)
    parser.add_argument("-ck", help="read in chunk size", dest="chunk_size", type=int, required=True)
    parser.set_defaults(func=run)
    args = parser.parse_args()
    args.func(args)


if __name__ == "__main__":
    main()
python multiprocessing argparse
1个回答
0
投票

ProcessPoolExecutor类是Executor子类,它使用进程池来异步执行调用。 ProcessPoolExecutor使用多处理模块,该模块可以使其避开全局解释器锁,但也意味着只能执行和返回可拾取对象。

根据默认情况,文档将采用max_workers <= 61,在这里我修改了某些部分才能使用。

with concurrent.futures.ProcessPoolExecutor(max_workers=6) as executor:
            processes.append(executor.submit(cal_breaking, df))
© www.soinside.com 2019 - 2024. All rights reserved.