为什么我的排列测试分析的曲线不平滑?

问题描述 投票:5回答:2

我正在使用置换测试(拉随机子样本)来测试2个实验之间的差异。每个实验进行100次(=每个100个复制品)。每个副本包含801个测量点。现在我想执行一种排列(或引导捆绑),以测试每个实验有多少副本(以及多少(时间)测量点)我需要获得一定的可靠性级别。

为此,我编写了一个代码,我从中提取了最小的工作示例(有许多硬编码的东西)(请参见下文)。输入数据作为随机数生成。这里np.random.rand(100,801)为100个副本和801个时间点。

该代码原则上起作用,但是如果选择随机子样本5000次,所产生的曲线有时不会平滑下降。以下代码的输出如下:

Result of the code below

可以看出,在x轴的2处,存在不应存在的峰值。如果我将随机种子从52389更改为324235,它就会消失并且曲线是平滑的。随机数的选择方式似乎有问题吗?

为什么会这样?我在Matlab中具有语义相似的代码,并且曲线在已经1000个排列(这里是5000)处完全平滑。

我是否有编码错误或者numpy随机数生成器不好?

有人在这看到问题吗?

import matplotlib.pyplot as plt
import numpy as np
from multiprocessing import current_process, cpu_count, Process, Queue
import matplotlib.pylab as pl


def groupDiffsInParallel (queue, d1, d2, nrOfReplicas, nrOfPermuts, timesOfInterestFramesIter):

    allResults = np.zeros([nrOfReplicas, nrOfPermuts])  # e.g. 100 x 3000
    for repsPerGroupIdx in range(1, nrOfReplicas + 1):
        for permutIdx in range(nrOfPermuts):
            d1TimeCut = d1[:, 0:int(timesOfInterestFramesIter)]
            d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d1Sel = d1TimeCut[d1Idxs, :]
            d1Mean = np.mean(d1Sel.flatten())

            d2TimeCut = d2[:, 0:int(timesOfInterestFramesIter)]
            d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d2Sel = d2TimeCut[d2Idxs, :]
            d2Mean = np.mean(d2Sel.flatten())

            diff = d1Mean - d2Mean

            allResults[repsPerGroupIdx - 1, permutIdx] = np.abs(diff)

    queue.put(allResults)



def evalDifferences_parallel (d1, d2):
    # d1 and d2 are of size reps x time (e.g. 100x801)

    nrOfReplicas = d1.shape[0]
    nrOfFrames = d1.shape[1]
    timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]  # 17
    nrOfTimesOfInterest = len(timesOfInterestNs)
    framesPerNs = (nrOfFrames-1)/100  # sim time == 100 ns
    timesOfInterestFrames = [x*framesPerNs for x in timesOfInterestNs]

    nrOfPermuts = 5000

    allResults = np.zeros([nrOfTimesOfInterest, nrOfReplicas, nrOfPermuts]) # e.g. 17 x 100 x 3000
    nrOfProcesses = cpu_count()
    print('{} cores available'.format(nrOfProcesses))
    queue = Queue()
    jobs = []
    print('Starting ...')

    # use one process for each time cut
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter in enumerate(timesOfInterestFrames):
        p = Process(target=groupDiffsInParallel, args=(queue, d1, d2, nrOfReplicas, nrOfPermuts, timesOfInterestFramesIter))
        p.start()
        jobs.append(p)
        print('Process {} started work on time \"{} ns\"'.format(timesOfInterestFramesIterIdx, timesOfInterestNs[timesOfInterestFramesIterIdx]), end='\n', flush=True)
    # collect the results
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter in enumerate(timesOfInterestFrames):
        oneResult = queue.get()
        allResults[timesOfInterestFramesIterIdx, :, :] = oneResult
        print('Process number {} returned the results.'.format(timesOfInterestFramesIterIdx), end='\n', flush=True)
    # hold main thread and wait for the child process to complete. then join back the resources in the main thread
    for proc in jobs:
        proc.join()
    print("All parallel done.")

    allResultsMeanOverPermuts = allResults.mean(axis=2)  # size: 17 x 100

    replicaNumbersToPlot = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100])
    replicaNumbersToPlot -= 1  # zero index!
    colors = pl.cm.jet(np.linspace(0, 1, len(replicaNumbersToPlot)))

    ctr = 0

    f, ax = plt.subplots(nrows=2, ncols=2, figsize=(12, 12))
    axId = (1, 0)
    for lineIdx in replicaNumbersToPlot:
        lineData = allResultsMeanOverPermuts[:, lineIdx]
        ax[axId].plot(lineData, ".-", color=colors[ctr], linewidth=0.5, label="nReps="+str(lineIdx+1))
        ctr+=1

    ax[axId].set_xticks(range(nrOfTimesOfInterest))  # careful: this is not the same as plt.xticks!!
    ax[axId].set_xticklabels(timesOfInterestNs)
    ax[axId].set_xlabel("simulation length taken into account")
    ax[axId].set_ylabel("average difference between mean values boot strapping samples")
    ax[axId].set_xlim([ax[axId].get_xlim()[0], ax[axId].get_xlim()[1]+1])  # increase x max by 2

    plt.show()


##### MAIN ####
np.random.seed(83737)  # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)

np.random.seed(52389)  # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2)

-------------更新---------------

将随机数生成器从numpy更改为“from random import randint”并不能解决问题:

从:

d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)

至:

d1Idxs = [randint(0, nrOfReplicas-1) for p in range(repsPerGroupIdx)]
d2Idxs = [randint(0, nrOfReplicas-1) for p in range(repsPerGroupIdx)]

---更新2 ---

timesOfInterestNs可以设置为:

timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50]

在具有更少内核的机器上加速它。

---更新3 ---

在每个子进程(Random seed is replication across child processes)中重新初始化随机种子生成器也不能解决问题:

pid = str(current_process())
pid = int(re.split("(\W)", pid)[6])
ms = int(round(time.time() * 1000))
mySeed = np.mod(ms, 4294967295)
mySeed = mySeed + 25000 * pid + 100 * pid + pid
mySeed = np.mod(mySeed, 4294967295)
np.random.seed(seed=mySeed)

---更新4 ---在Windows机器上你需要一个:

if __name__ == '__main__':

避免递归地创建子进程(以及崩溃)。

python numpy random multiprocessing permutation
2个回答
6
投票

我想这是经典的多处理错误。没有什么能保证流程的顺序与他们开始的顺序完成相同。这意味着您无法确定指令allResults[timesOfInterestFramesIterIdx, :, :] = oneResult是否会在allResults中的'timesOfInterestFramesIterIdx'位置存储进程'timesOfInterestFramesIterIdx'的结果。为了更清楚,让我们说'timesOfInterestFramesIterIdx'是2,那么你绝对不能保证oneResult是进程2的输出。

我在下面实施了一个非常快速的修复。这个想法是通过向groupDiffsInParallel添加一个额外的参数来跟踪启动进程的顺序,然后将其存储在队列中,从而在收集结果时充当进程标识符。

import matplotlib.pyplot as plt
import numpy as np
from multiprocessing import cpu_count, Process, Queue
import matplotlib.pylab as pl


def groupDiffsInParallel(queue, d1, d2, nrOfReplicas, nrOfPermuts,
                         timesOfInterestFramesIter,
                         timesOfInterestFramesIterIdx):

    allResults = np.zeros([nrOfReplicas, nrOfPermuts])  # e.g. 100 x 3000
    for repsPerGroupIdx in range(1, nrOfReplicas + 1):
        for permutIdx in range(nrOfPermuts):
            d1TimeCut = d1[:, 0:int(timesOfInterestFramesIter)]
            d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d1Sel = d1TimeCut[d1Idxs, :]
            d1Mean = np.mean(d1Sel.flatten())

            d2TimeCut = d2[:, 0:int(timesOfInterestFramesIter)]
            d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d2Sel = d2TimeCut[d2Idxs, :]
            d2Mean = np.mean(d2Sel.flatten())

            diff = d1Mean - d2Mean

            allResults[repsPerGroupIdx - 1, permutIdx] = np.abs(diff)

    queue.put({'allResults': allResults,
               'number': timesOfInterestFramesIterIdx})


def evalDifferences_parallel (d1, d2):
    # d1 and d2 are of size reps x time (e.g. 100x801)

    nrOfReplicas = d1.shape[0]
    nrOfFrames = d1.shape[1]
    timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 60, 70,
                         80, 90, 100]  # 17
    nrOfTimesOfInterest = len(timesOfInterestNs)
    framesPerNs = (nrOfFrames-1)/100  # sim time == 100 ns
    timesOfInterestFrames = [x*framesPerNs for x in timesOfInterestNs]

    nrOfPermuts = 5000

    allResults = np.zeros([nrOfTimesOfInterest, nrOfReplicas,
                           nrOfPermuts])  # e.g. 17 x 100 x 3000
    nrOfProcesses = cpu_count()
    print('{} cores available'.format(nrOfProcesses))
    queue = Queue()
    jobs = []
    print('Starting ...')

    # use one process for each time cut
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter \
            in enumerate(timesOfInterestFrames):
        p = Process(target=groupDiffsInParallel,
                    args=(queue, d1, d2, nrOfReplicas, nrOfPermuts,
                          timesOfInterestFramesIter,
                          timesOfInterestFramesIterIdx))
        p.start()
        jobs.append(p)
        print('Process {} started work on time \"{} ns\"'.format(
            timesOfInterestFramesIterIdx,
            timesOfInterestNs[timesOfInterestFramesIterIdx]),
              end='\n', flush=True)
    # collect the results
    resultdict = {}
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter \
            in enumerate(timesOfInterestFrames):
        resultdict.update(queue.get())
        allResults[resultdict['number'], :, :] = resultdict['allResults']
        print('Process number {} returned the results.'.format(
            resultdict['number']), end='\n', flush=True)
    # hold main thread and wait for the child process to complete. then join
    # back the resources in the main thread
    for proc in jobs:
        proc.join()
    print("All parallel done.")

    allResultsMeanOverPermuts = allResults.mean(axis=2)  # size: 17 x 100

    replicaNumbersToPlot = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40,
                                     50, 60, 70, 80, 90, 100])
    replicaNumbersToPlot -= 1  # zero index!
    colors = pl.cm.jet(np.linspace(0, 1, len(replicaNumbersToPlot)))

    ctr = 0

    f, ax = plt.subplots(nrows=2, ncols=2, figsize=(12, 12))
    axId = (1, 0)
    for lineIdx in replicaNumbersToPlot:
        lineData = allResultsMeanOverPermuts[:, lineIdx]
        ax[axId].plot(lineData, ".-", color=colors[ctr], linewidth=0.5,
                      label="nReps="+str(lineIdx+1))
        ctr += 1

    ax[axId].set_xticks(range(nrOfTimesOfInterest))
    # careful: this is not the same as plt.xticks!!
    ax[axId].set_xticklabels(timesOfInterestNs)
    ax[axId].set_xlabel("simulation length taken into account")
    ax[axId].set_ylabel("average difference between mean values boot "
                        + "strapping samples")
    ax[axId].set_xlim([ax[axId].get_xlim()[0], ax[axId].get_xlim()[1]+1])
    # increase x max by 2

    plt.show()


# #### MAIN ####
np.random.seed(83737)  # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)

np.random.seed(52389)  # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2)

这是我得到的输出,这显然表明与起始顺序相比,进程返回的顺序被洗牌。

20 cores available
Starting ...
Process 0 started work on time "0.25 ns"
Process 1 started work on time "0.5 ns"
Process 2 started work on time "1 ns"
Process 3 started work on time "2 ns"
Process 4 started work on time "3 ns"
Process 5 started work on time "4 ns"
Process 6 started work on time "5 ns"
Process 7 started work on time "10 ns"
Process 8 started work on time "20 ns"
Process 9 started work on time "30 ns"
Process 10 started work on time "40 ns"
Process 11 started work on time "50 ns"
Process 12 started work on time "60 ns"
Process 13 started work on time "70 ns"
Process 14 started work on time "80 ns"
Process 15 started work on time "90 ns"
Process 16 started work on time "100 ns"
Process number 3 returned the results.
Process number 0 returned the results.
Process number 4 returned the results.
Process number 7 returned the results.
Process number 1 returned the results.
Process number 2 returned the results.
Process number 5 returned the results.
Process number 8 returned the results.
Process number 6 returned the results.
Process number 9 returned the results.
Process number 10 returned the results.
Process number 11 returned the results.
Process number 12 returned the results.
Process number 13 returned the results.
Process number 14 returned the results.
Process number 15 returned the results.
Process number 16 returned the results.
All parallel done.

而且产生的数字。

Correct output


4
投票

不确定你是否还在这个问题上,但我只是在我的机器上运行你的代码(MacBook Pro(15英寸,2018年))在Jupyter 4.4.0,我的图表很平滑,你最初发布的种子值完全相同:

##### MAIN ####
np.random.seed(83737)  # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)

np.random.seed(52389)  # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2) 

Plot

也许您的代码没有任何问题,324235种子没有什么特别之处,您只需要仔细检查您的模块版本,因为在最新版本中对源代码所做的任何更改都可能会影响您的结果。作为参考,我使用的是numpy 1.15.4matplotlib 3.0.2multiprocessing 2.6.2.1

© www.soinside.com 2019 - 2024. All rights reserved.