如何显示n个threads的当前吞吐量?

问题描述 投票:0回答:2

想象一下,有n个线程,它们都在一个SOAP端点上工作,获取一些数据并存储到文件系统中。有什么办法可以显示这些线程最后一秒的吞吐量呢?

我想到了下面的办法。

所有32个(n 在这种情况下)线程持有一个 AtomicInteger 在向文件系统写入一个文件后,每一个线程都会增加这个值。这个 AtomicInteger 然后被另一个线程(#33)评估,它只是为了记录这个 AtomicInteger 除以n-Threads,通过使用metrics(剩余条目总吞吐量)重新设置估计ETA AtomicInteger 为零,然后休眠一秒,继续循环,直到所有线程完成工作。

但是,这样做有以下问题:锁定AtomicInteger会造成一定的性能损失(虽然我认为可以忽略不计),而且输出永远不会是最后一秒,因为锁定时间会有一些变化。

有谁能想到更优雅的解决这个问题的方法吗?我想我可能完全想多了,Java已经有一些解决方案了。

即使可能有第三方的解决方案或包来解决这个问题,我也宁愿了解如何正确地做这件事,而不是仅仅使用一个包。

线程是由一个 ExecutorService 而所有的线都是 Runnable 种。

java multithreading algorithm throughput
2个回答
1
投票

最简单的方法是只更新第n次,比如说如果AtomicInteger是瓶颈的话,每100次调用才更新一次。我建议先用一个Atomicinteger,只有当你发现有问题时,才使用更复杂的解决方案,有抽样统计的库,例如。https:/github.comHdrHistogramHdrHistogram。 但由于你只想测量一个端点,这不符合你的需求。


1
投票

我认为你有一个非常定制的关于度量收集的要求。我怀疑是否有任何指标收集库在做你所期望的事情。也许可以看看千分尺GaugeCounter。

问题的根源。你正试图由n+1个工人并发地读写同一个资源。用例允许你使用ScatterGather模式的变体,一个工作者可以聚合n个工作者的工作,n个工作者可以读写自己拥有的n个资源。

假设。我假设你在一个列表或数组中拥有对所有线程的引用 而且你的文件不会超过10个。Integer.MAX_VALUE.

解决办法。你可以用 volatile int 作为每个线程中的一个成分,而不是通过 AtomicInteger 从外面看,这些计数器是单调增加的。这些计数器将是单调增加的,并且在计算线程中永远不会被修改或重置。

你可以在计算线程中保留一个int值的本地副本,然后从新的值中减去这个本地副本,以获得最后一秒处理的文件,并以新的值更新本地副本(这相当于将每个线程的计数器重置为0)。另外,如果可以的话,你也可以直接跟踪计数器的总和,而不是为每个线程存储计数器。

class Computation implements Runnable {

    List<Worker> workerList;
    int[] localCounters;

    boolean finished;

    public Computation(List<Worker> workerList) {
        this.workerList = workerList;
        this.localCounters = new int[workerList.size()];
    }

    @Override
    public void run() {

        while (!finished) {

            for (int i = 0; i < workerList.size(); i++) {
                localCounters[i] = workerList.get(i).getCounter();
            }

            computation();
            pause();
        }
    }

    private void computation(){
        //do your computation with localCounters
        //this.localCounters[x]
    }

    private boolean isFinished () {
        return finished;
    }

    private void pause() {
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
class Worker implements Runnable {

    private volatile int counter;

    @Override
    public void run() {
        // do your thing.
    }

    public int getCounter() {
        return counter;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.