
问题描述 投票:5回答:2




Result of the code below





import matplotlib.pyplot as plt
import numpy as np
from multiprocessing import current_process, cpu_count, Process, Queue
import matplotlib.pylab as pl

def groupDiffsInParallel (queue, d1, d2, nrOfReplicas, nrOfPermuts, timesOfInterestFramesIter):

    allResults = np.zeros([nrOfReplicas, nrOfPermuts])  # e.g. 100 x 3000
    for repsPerGroupIdx in range(1, nrOfReplicas + 1):
        for permutIdx in range(nrOfPermuts):
            d1TimeCut = d1[:, 0:int(timesOfInterestFramesIter)]
            d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d1Sel = d1TimeCut[d1Idxs, :]
            d1Mean = np.mean(d1Sel.flatten())

            d2TimeCut = d2[:, 0:int(timesOfInterestFramesIter)]
            d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d2Sel = d2TimeCut[d2Idxs, :]
            d2Mean = np.mean(d2Sel.flatten())

            diff = d1Mean - d2Mean

            allResults[repsPerGroupIdx - 1, permutIdx] = np.abs(diff)


def evalDifferences_parallel (d1, d2):
    # d1 and d2 are of size reps x time (e.g. 100x801)

    nrOfReplicas = d1.shape[0]
    nrOfFrames = d1.shape[1]
    timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]  # 17
    nrOfTimesOfInterest = len(timesOfInterestNs)
    framesPerNs = (nrOfFrames-1)/100  # sim time == 100 ns
    timesOfInterestFrames = [x*framesPerNs for x in timesOfInterestNs]

    nrOfPermuts = 5000

    allResults = np.zeros([nrOfTimesOfInterest, nrOfReplicas, nrOfPermuts]) # e.g. 17 x 100 x 3000
    nrOfProcesses = cpu_count()
    print('{} cores available'.format(nrOfProcesses))
    queue = Queue()
    jobs = []
    print('Starting ...')

    # use one process for each time cut
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter in enumerate(timesOfInterestFrames):
        p = Process(target=groupDiffsInParallel, args=(queue, d1, d2, nrOfReplicas, nrOfPermuts, timesOfInterestFramesIter))
        print('Process {} started work on time \"{} ns\"'.format(timesOfInterestFramesIterIdx, timesOfInterestNs[timesOfInterestFramesIterIdx]), end='\n', flush=True)
    # collect the results
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter in enumerate(timesOfInterestFrames):
        oneResult = queue.get()
        allResults[timesOfInterestFramesIterIdx, :, :] = oneResult
        print('Process number {} returned the results.'.format(timesOfInterestFramesIterIdx), end='\n', flush=True)
    # hold main thread and wait for the child process to complete. then join back the resources in the main thread
    for proc in jobs:
    print("All parallel done.")

    allResultsMeanOverPermuts = allResults.mean(axis=2)  # size: 17 x 100

    replicaNumbersToPlot = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100])
    replicaNumbersToPlot -= 1  # zero index!
    colors = pl.cm.jet(np.linspace(0, 1, len(replicaNumbersToPlot)))

    ctr = 0

    f, ax = plt.subplots(nrows=2, ncols=2, figsize=(12, 12))
    axId = (1, 0)
    for lineIdx in replicaNumbersToPlot:
        lineData = allResultsMeanOverPermuts[:, lineIdx]
        ax[axId].plot(lineData, ".-", color=colors[ctr], linewidth=0.5, label="nReps="+str(lineIdx+1))

    ax[axId].set_xticks(range(nrOfTimesOfInterest))  # careful: this is not the same as plt.xticks!!
    ax[axId].set_xlabel("simulation length taken into account")
    ax[axId].set_ylabel("average difference between mean values boot strapping samples")
    ax[axId].set_xlim([ax[axId].get_xlim()[0], ax[axId].get_xlim()[1]+1])  # increase x max by 2


##### MAIN ####
np.random.seed(83737)  # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)

np.random.seed(52389)  # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2)


将随机数生成器从numpy更改为“from random import randint”并不能解决问题:


d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)


d1Idxs = [randint(0, nrOfReplicas-1) for p in range(repsPerGroupIdx)]
d2Idxs = [randint(0, nrOfReplicas-1) for p in range(repsPerGroupIdx)]

---更新2 ---


timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50]


---更新3 ---

在每个子进程(Random seed is replication across child processes)中重新初始化随机种子生成器也不能解决问题:

pid = str(current_process())
pid = int(re.split("(\W)", pid)[6])
ms = int(round(time.time() * 1000))
mySeed = np.mod(ms, 4294967295)
mySeed = mySeed + 25000 * pid + 100 * pid + pid
mySeed = np.mod(mySeed, 4294967295)

---更新4 ---在Windows机器上你需要一个:

if __name__ == '__main__':


python numpy random multiprocessing permutation

我想这是经典的多处理错误。没有什么能保证流程的顺序与他们开始的顺序完成相同。这意味着您无法确定指令allResults[timesOfInterestFramesIterIdx, :, :] = oneResult是否会在allResults中的'timesOfInterestFramesIterIdx'位置存储进程'timesOfInterestFramesIterIdx'的结果。为了更清楚,让我们说'timesOfInterestFramesIterIdx'是2,那么你绝对不能保证oneResult是进程2的输出。


import matplotlib.pyplot as plt
import numpy as np
from multiprocessing import cpu_count, Process, Queue
import matplotlib.pylab as pl

def groupDiffsInParallel(queue, d1, d2, nrOfReplicas, nrOfPermuts,

    allResults = np.zeros([nrOfReplicas, nrOfPermuts])  # e.g. 100 x 3000
    for repsPerGroupIdx in range(1, nrOfReplicas + 1):
        for permutIdx in range(nrOfPermuts):
            d1TimeCut = d1[:, 0:int(timesOfInterestFramesIter)]
            d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d1Sel = d1TimeCut[d1Idxs, :]
            d1Mean = np.mean(d1Sel.flatten())

            d2TimeCut = d2[:, 0:int(timesOfInterestFramesIter)]
            d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d2Sel = d2TimeCut[d2Idxs, :]
            d2Mean = np.mean(d2Sel.flatten())

            diff = d1Mean - d2Mean

            allResults[repsPerGroupIdx - 1, permutIdx] = np.abs(diff)

    queue.put({'allResults': allResults,
               'number': timesOfInterestFramesIterIdx})

def evalDifferences_parallel (d1, d2):
    # d1 and d2 are of size reps x time (e.g. 100x801)

    nrOfReplicas = d1.shape[0]
    nrOfFrames = d1.shape[1]
    timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 60, 70,
                         80, 90, 100]  # 17
    nrOfTimesOfInterest = len(timesOfInterestNs)
    framesPerNs = (nrOfFrames-1)/100  # sim time == 100 ns
    timesOfInterestFrames = [x*framesPerNs for x in timesOfInterestNs]

    nrOfPermuts = 5000

    allResults = np.zeros([nrOfTimesOfInterest, nrOfReplicas,
                           nrOfPermuts])  # e.g. 17 x 100 x 3000
    nrOfProcesses = cpu_count()
    print('{} cores available'.format(nrOfProcesses))
    queue = Queue()
    jobs = []
    print('Starting ...')

    # use one process for each time cut
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter \
            in enumerate(timesOfInterestFrames):
        p = Process(target=groupDiffsInParallel,
                    args=(queue, d1, d2, nrOfReplicas, nrOfPermuts,
        print('Process {} started work on time \"{} ns\"'.format(
              end='\n', flush=True)
    # collect the results
    resultdict = {}
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter \
            in enumerate(timesOfInterestFrames):
        allResults[resultdict['number'], :, :] = resultdict['allResults']
        print('Process number {} returned the results.'.format(
            resultdict['number']), end='\n', flush=True)
    # hold main thread and wait for the child process to complete. then join
    # back the resources in the main thread
    for proc in jobs:
    print("All parallel done.")

    allResultsMeanOverPermuts = allResults.mean(axis=2)  # size: 17 x 100

    replicaNumbersToPlot = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40,
                                     50, 60, 70, 80, 90, 100])
    replicaNumbersToPlot -= 1  # zero index!
    colors = pl.cm.jet(np.linspace(0, 1, len(replicaNumbersToPlot)))

    ctr = 0

    f, ax = plt.subplots(nrows=2, ncols=2, figsize=(12, 12))
    axId = (1, 0)
    for lineIdx in replicaNumbersToPlot:
        lineData = allResultsMeanOverPermuts[:, lineIdx]
        ax[axId].plot(lineData, ".-", color=colors[ctr], linewidth=0.5,
        ctr += 1

    # careful: this is not the same as plt.xticks!!
    ax[axId].set_xlabel("simulation length taken into account")
    ax[axId].set_ylabel("average difference between mean values boot "
                        + "strapping samples")
    ax[axId].set_xlim([ax[axId].get_xlim()[0], ax[axId].get_xlim()[1]+1])
    # increase x max by 2


# #### MAIN ####
np.random.seed(83737)  # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)

np.random.seed(52389)  # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2)


20 cores available
Starting ...
Process 0 started work on time "0.25 ns"
Process 1 started work on time "0.5 ns"
Process 2 started work on time "1 ns"
Process 3 started work on time "2 ns"
Process 4 started work on time "3 ns"
Process 5 started work on time "4 ns"
Process 6 started work on time "5 ns"
Process 7 started work on time "10 ns"
Process 8 started work on time "20 ns"
Process 9 started work on time "30 ns"
Process 10 started work on time "40 ns"
Process 11 started work on time "50 ns"
Process 12 started work on time "60 ns"
Process 13 started work on time "70 ns"
Process 14 started work on time "80 ns"
Process 15 started work on time "90 ns"
Process 16 started work on time "100 ns"
Process number 3 returned the results.
Process number 0 returned the results.
Process number 4 returned the results.
Process number 7 returned the results.
Process number 1 returned the results.
Process number 2 returned the results.
Process number 5 returned the results.
Process number 8 returned the results.
Process number 6 returned the results.
Process number 9 returned the results.
Process number 10 returned the results.
Process number 11 returned the results.
Process number 12 returned the results.
Process number 13 returned the results.
Process number 14 returned the results.
Process number 15 returned the results.
Process number 16 returned the results.
All parallel done.


Correct output


不确定你是否还在这个问题上,但我只是在我的机器上运行你的代码(MacBook Pro(15英寸,2018年))在Jupyter 4.4.0,我的图表很平滑,你最初发布的种子值完全相同:

##### MAIN ####
np.random.seed(83737)  # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)

np.random.seed(52389)  # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2) 


也许您的代码没有任何问题,324235种子没有什么特别之处,您只需要仔细检查您的模块版本,因为在最新版本中对源代码所做的任何更改都可能会影响您的结果。作为参考,我使用的是numpy 1.15.4matplotlib 3.0.2multiprocessing

© www.soinside.com 2019 - 2024. All rights reserved.