使用 python 包装器并行化 python 脚本

问题描述 投票:0回答:4

我有一个 python 脚本

heavy_lifting.py
,我已使用从 bash 包装器脚本
wrapper.sh
调用的 GNU Parallel 对其进行并行化。我用它来处理 fastq 格式的文件,请参阅下面的
example.fastq
。虽然这可行,但要求使用两个解释器和依赖集是不优雅的。我想使用 python 重写 bash 包装器脚本,同时实现相同的并行化。

example.fastq
这是需要处理的输入文件的示例。此输入文件通常很长(~500,000,000)行。

@SRR6750041.1 1/1
CTGGANAAGTGAAATAATATAAATTTTTCCACTATTGAATAAAAGCAACTTAAATTTTCTAAGTCG
+
AAAAA#EEEEEEEEEEEEEEEEEEEEEEEAEEEEEEEEEEEEEEEEEEEEEEEEEA<AAEEEEE<6
@SRR6750041.2 2/1
CTATANTATTCTATATTTATTCTAGATAAAAGCATTCTATATTTAGCATATGTCTAGCAAAAAAAA
+
AAAAA#EE6EEEEEEEEEEEEAAEEAEEEEEEEEEEEE/EAE/EAE/EA/EAEAAAE//EEAEAA6
@SRR6750041.3 3/1
ATCCANAATGATGTGTTGCTCTGGAGGTACAGAGATAACGTCAGCTGGAATAGTTTCCCCTCACAG
+
AAAAA#EE6E6EEEEEE6EEEEAEEEEEEEEEEE//EAEEEEEAAEAEEEAE/EAEEA6/EEA<E/
@SRR6750041.4 4/1
ACACCNAATGCTCTGGCCTCTCAAGCACGTGGATTATGCCAGAGAGGCCAGAGCATTCTTCGTACA
+
/AAAA#EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEAE/E/<//AEA/EA//E//

下面是我开始使用的脚本的最小可重现示例。

heavy_lifting.py

#!/usr/bin/env python
import argparse

# Read in arguments
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--inputFastq', required=True, help='forward .fastq')
parser.add_argument('-o', '--outputFastq', required=True, help='output .fastq')
args = parser.parse_args()

# Iterate through input file and append to output file
with open(args.inputFastq, "r") as infile:
    with open(args.outputFastq, "a") as outfile:
    for line in infile:
        outfile.write("modified" + line)

wrapper.sh

#!/bin/bash

NUMCORES="4"
FASTQ_F="./fastq_F.fastq"

# split the input fastq for parallel processing. One split fastq file will be created for     each core available.
split --number="l/$NUMCORES" $FASTQ_F split_fastq_F_

# Feed split fastq files to GNU Parallel to invoke parallel executions of `heavy_lifting.py`
ls split_fastq_F* | awk -F "split_fastq_F" '{print $2}' | parallel "python  heavy_lifting.py -i split_fastq_F{} -o output.fastq"

#remove intermediate split fastq files
rm split_fastq_*

要执行这些脚本,我使用命令

bash wrapper.sh
。您可以看到结果文件
output.fastq
已创建,并且包含修改后的fastq文件。

下面是我尝试使用Python包装器调用并行处理

wrapper.py

wrapper.py

#!/usr/bin/env python

import heavy_lifting
from joblib import Parallel, delayed
import multiprocessing

numcores = 4
fastq_F = "fastq_F.fastq"

#Create some logic to split the input fastq file into chunks for parallel processing.  

# Get input fastq file dimensions
with open(fastq_F, "r") as infile:
    length_fastq = len(infile.readlines())
    print(length_fastq)
    lines = infile.readlines()
    split_size = length_fastq / numcores
    print(split_size)

# Iterate through input fastq file writing lines to outfile in bins.
counter = 0
split_counter = 0
split_fastq_list = []
with open(fastq_F, "r") as infile:
    for line in infile:
        if counter == 0:
            filename = str("./split_fastq_F_" + str(split_counter))
            split_fastq_list.append(filename)
            outfile = open(filename, "a")
            counter += 1
        elif counter <= split_size:
            outfile.write(line.strip())
            counter += 1
        else:
            counter = 0
            split_counter += 1
            outfile.close()


Parallel(n_jobs=numcores)(delayed(heavy_lifting)(i, "output.fastq") for i in split_fastq_list)

编辑以提高wrapper.py的可重复性

我似乎最困惑的是如何将输入参数正确地输入到 pythonwrapper.py 脚本中的“Parallel”调用中。非常感谢任何帮助!

python parallel-processing bioinformatics joblib fastq
4个回答
1
投票

Parallel
需要函数名称,而不是文件/模块名称

所以在

heavy_lifting
中,你必须将代码放入函数中(使用参数而不是
args

def my_function(inputFastq, outputFastq):

    with open(inputFastq, "r") as infile:
        with open(outputFastq, "a") as outfile:
            for line in infile:
                outfile.write("modified" + line)

然后就可以使用了

Parallel(n_jobs=numcores)(delayed(heavy_lifting.my_function)(i, "output.fastq") for i in split_fastq_list)

1
投票

这应该是一个评论,因为它没有回答问题,但它太大了。

所有

wrapper.sh
都可以写成:

parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart --cat "python  heavy_lifting.py -i {} -o output.fastq"

如果

heavy_lifting.py
只读取文件而不查找,这也应该可以工作,并且需要更少的磁盘 I/O(临时文件被 fifo 替换):

parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart --fifo "python  heavy_lifting.py -i {} -o output.fastq"

它将自动检测CPU线程的数量,在以@SRR开头的行分割fastq文件,动态地将其分割为每个CPU线程的一个块,并将其提供给python。

如果

heavy_lifting.py
在没有给出
-i
的情况下从标准输入读取,那么这也应该有效:

parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart "python heavy_lifting.py -o output.fastq"

如果

heavy_lifting.py
未向
output.fastq
附加唯一字符串,它将被覆盖。因此,最好让 GNU Parallel 给它一个独特的名称,例如
output2.fastq
:

parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart "python heavy_lifting.py -o output{#}.fastq"

有关更通用的 FASTQ 并行包装器,请参阅:https://stackoverflow.com/a/41707920/363028


0
投票

为了重现性,我将 Furas 提供的答案实现到

heavy_lifting.py
wrapper.py
脚本中。需要进行额外的编辑才能使代码运行,这就是我提供以下内容的原因。

heavy_lifting.py

#!/usr/bin/env python
import argparse

# Read in arguments
#parser = argparse.ArgumentParser()
#parser.add_argument('-i', '--inputFastq', required=True, help='forward .fastq')
#parser.add_argument('-o', '--outputFastq', required=True, help='output .fastq')
#args = parser.parse_args()

def heavy_lifting_fun(inputFastq, outputFastq):
    # Iterate through input file and append to output file
    outfile = open(outputFastq, "a")
    with open(inputFastq, "r") as infile:
        for line in infile:
            outfile.write("modified" + line.strip() + "\n")
    outfile.close()

if __name__ == '__main__':
heavy_lifting_fun()

wrapper.py

#!/usr/bin/env python

import heavy_lifting
from joblib import Parallel, delayed
import multiprocessing

numcores = 4
fastq_F = "fastq_F.fastq"

#Create some logic to split the input fastq file into chunks for parallel processing.  

# Get input fastq file dimensions
with open(fastq_F, "r") as infile:
    length_fastq = len(infile.readlines())
    print(length_fastq)
    lines = infile.readlines()
    split_size = length_fastq / numcores
    while (split_size  % 4 != 0):
        split_size += 1
    print(split_size)

# Iterate through input fastq file writing lines to outfile in bins.
counter = 0
split_counter = 0
split_fastq_list = []
with open(fastq_F, "r") as infile:
    for line in infile:
        print(counter)
        #if counter == 0 and line[0] != "@":
        #    continue
        if counter == 0:
            filename = str("./split_fastq_F_" + str(split_counter))
            split_fastq_list.append(filename)
            outfile = open(filename, "a")
            outfile.write(str(line.strip() + "\n"))
            counter += 1
        elif counter < split_size:
            outfile.write(str(line.strip() + "\n"))
            counter += 1
        else:
            counter = 0
            split_counter += 1
            outfile.close()
            filename = str("./split_fastq_F_" + str(split_counter))
            split_fastq_list.append(filename)
            outfile = open(filename, "a")
            outfile.write(str(line.strip() + "\n"))
            counter += 1
    outfile.close()

Parallel(n_jobs=numcores)(delayed(heavy_lifting.heavy_lifting_fun)(i, "output.fastq") for i in split_fastq_list)

0
投票

这篇文章可能会有所帮助。建议的解决方案:

import concurrent.futures
import os
from functools import wraps

def make_parallel(func):
    """
        Decorator used to decorate any function which needs to be parallelized.
        After the input of the function should be a list in which each element is a instance of input for the normal function.
        You can also pass in keyword arguments separately.
        :param func: function
            The instance of the function that needs to be parallelized.
        :return: function
    """

    @wraps(func)
    def wrapper(lst):
        """

        :param lst:
            The inputs of the function in a list.
        :return:
        """
        # the number of threads that can be max-spawned.
        # If the number of threads are too high, then the overhead of creating the threads will be significant.
        # Here we are choosing the number of CPUs available in the system and then multiplying it with a constant.
        # In my system, i have a total of 8 CPUs so i will be generating a maximum of 16 threads in my system.
        number_of_threads_multiple = 2 # You can change this multiple according to you requirement
        number_of_workers = int(os.cpu_count() * number_of_threads_multiple)
        if len(lst) < number_of_workers:
            # If the length of the list is low, we would only require those many number of threads.
            # Here we are avoiding creating unnecessary threads
            number_of_workers = len(lst)

        if number_of_workers:
            if number_of_workers == 1:
                # If the length of the list that needs to be parallelized is 1, there is no point in
                # parallelizing the function.
                # So we run it serially.
                result = [func(lst[0])]
            else:
                # Core Code, where we are creating max number of threads and running the decorated function in parallel.
                result = []
                with concurrent.futures.ThreadPoolExecutor(max_workers=number_of_workers) as executor:
                    bag = {executer.submit(func, i): i for i in lst}
                    for future in concurrent.futures.as_completed(bag):
                        result.append(future.result())
        else:
            result = []
        return result
    return wrapper 

使用示例:

# Paralleized way of calling the function
results = make_parallel(sample_function)(list_of_post_ids)
© www.soinside.com 2019 - 2024. All rights reserved.