从元素具有权重的列表中选择k个随机元素

问题描述 投票:67回答:14

没有任何重量(相同的概率)的选择被精美地描述here

我想知道是否有办法将这种方法转换为加权方法。

我也对其他方法感兴趣。

更新:无需更换的采样

algorithm math random statistics probability
14个回答
24
投票

我知道这是一个非常古老的问题,但是如果你应用一点数学,我认为在O(n)时间有一个巧妙的技巧!

exponential distribution有两个非常有用的属性。

  1. 给定来自具有不同速率参数的不同指数分布的n个样本,给定样本的最小概率等于其速率参数除以所有速率参数的总和。
  2. 它是“无记忆的”。因此,如果您已经知道最小值,那么任何剩余元素是第二个到最小值的概率与如果真正的最小值被移除(并且从未生成)的概率相同,那个元素将是新的分钟。这看起来很明显,但我认为由于某些条件概率问题,其他分布可能不是这样。

使用事实1,我们知道选择单个元素可以通过生成速率参数等于权重的指数分布样本,然后选择具有最小值的指数分布样本来完成。

使用事实2,我们知道我们不必重新生成指数样本。相反,只需为每个元素生成一个元素,并使用最低样本的k元素。

找到最低k可以在O(n)中完成。使用Quickselect算法找到第k个元素,然后简单地再次通过所有元素并输出低于第k个的所有元素。

一个有用的说明:如果您没有立即访问库以生成指数分布样本,可以通过以下方式轻松完成:-ln(rand())/weight


0
投票

我在这里提出了一个简单的解决方案来挑选1项,你可以轻松地扩展它为k项(Java风格):

double random = Math.random();
double sum = 0;
for (int i = 0; i < items.length; i++) {
    val = items[i];
    sum += val.getValue();
    if (sum > random) {
        selected = val;
        break;
    }
}

0
投票

我已经在Rust here中实现了类似于Jason Orendorff的想法的算法。我的版本还支持批量操作:插入和删除(当你想从他们的id中删除一堆项目时,而不是通过加权选择路径)从O(m + log n)时间的数据结构中删除,其中m是要删除的项目数和n存储的项目数。


0
投票

采样无需递归替换 - c#中优雅且非常简短的解决方案

//我们可以选择60个学生中的4个,这样每次我们选择不同的4个

class Program
{
    static void Main(string[] args)
    {
        int group = 60;
        int studentsToChoose = 4;

        Console.WriteLine(FindNumberOfStudents(studentsToChoose, group));
    }

    private static int FindNumberOfStudents(int studentsToChoose, int group)
    {
        if (studentsToChoose == group || studentsToChoose == 0)
            return 1;

        return FindNumberOfStudents(studentsToChoose, group - 1) + FindNumberOfStudents(studentsToChoose - 1, group - 1);

    }
}

0
投票

我只花了几个小时试图在没有替换的情况下支持采样算法,这个主题比我最初的想法更复杂。真令人兴奋!为了未来的读者的利益(祝你有个美好的一天!)我在这里记录了我的见解,包括一个随时可用的功能,该功能在下面进一步考虑了给定的包含概率。可以在这里找到各种方法的快速数学概述:Tillé: Algorithms of sampling with equal or unequal probabilities。例如,Jason的方法可以在第46页找到。他的方法的警告是权重与包含概率不成比例,如文件中所述。实际上,第i个包含概率可以递归计算如下:

def inclusion_probability(i, weights, k):
    """
        Computes the inclusion probability of the i-th element
        in a randomly sampled k-tuple using Jason's algorithm
        (see https://stackoverflow.com/a/2149533/7729124)
    """
    if k <= 0: return 0
    cum_p = 0
    for j, weight in enumerate(weights):
        # compute the probability of j being selected considering the weights
        p = weight / sum(weights)

        if i == j:
            # if this is the target element, we don't have to go deeper,
            # since we know that i is included
            cum_p += p
        else:
            # if this is not the target element, than we compute the conditional
            # inclusion probability of i under the constraint that j is included
            cond_i = i if i < j else i-1
            cond_weights = weights[:j] + weights[j+1:]
            cond_p = inclusion_probability(cond_i, cond_weights, k-1)
            cum_p += p * cond_p
    return cum_p

我们可以通过比较来检查上述函数的有效性

In : for i in range(3): print(i, inclusion_probability(i, [1,2,3], 2))
0 0.41666666666666663
1 0.7333333333333333
2 0.85

In : import collections, itertools
In : sample_tester = lambda f: collections.Counter(itertools.chain(*(f() for _ in range(10000))))
In : sample_tester(lambda: random_weighted_sample_no_replacement([(1,'a'),(2,'b'),(3,'c')],2))
Out: Counter({'a': 4198, 'b': 7268, 'c': 8534})

一种方法 - 在上面的文档中也建议 - 指定包含概率是从它们计算权重。手头问题的整体复杂性源于这样一个事实:人们不能直接这样做,因为基本上必须反转递归公式,象征性地我声称这是不可能的。在数值上,它可以使用所有类型的方法来完成,例如,牛顿的方法。然而,使用普通Python反转雅可比行列式的复杂性变得难以忍受,我真的建议在这种情况下调查numpy.random.choice

幸运的是,使用普通Python的方法可能会或可能不具备足够的性能,如果没有那么多不同的权重,它会很有效。您可以在第75和76页找到算法。它的工作原理是将采样过程分成具有相同包含概率的部分,即我们可以再次使用random.sample!我不打算在这里解释这个原理,因为第69页很好地介绍了基础知识。以下是代码,希望有足够的评论:

def sample_no_replacement_exact(items, k, best_effort=False, random_=None, ε=1e-9):
    """
        Returns a random sample of k elements from items, where items is a list of
        tuples (weight, element). The inclusion probability of an element in the
        final sample is given by
           k * weight / sum(weights).

        Note that the function raises if a inclusion probability cannot be
        satisfied, e.g the following call is obviously illegal:
           sample_no_replacement_exact([(1,'a'),(2,'b')],2)
        Since selecting two elements means selecting both all the time,
        'b' cannot be selected twice as often as 'a'. In general it can be hard to
        spot if the weights are illegal and the function does *not* always raise
        an exception in that case. To remedy the situation you can pass
        best_effort=True which redistributes the inclusion probability mass
        if necessary. Note that the inclusion probabilities will change
        if deemed necessary.

        The algorithm is based on the splitting procedure on page 75/76 in:
        http://www.eustat.eus/productosServicios/52.1_Unequal_prob_sampling.pdf
        Additional information can be found here:
        https://stackoverflow.com/questions/2140787/

        :param items: list of tuples of type weight,element
        :param k: length of resulting sample
        :param best_effort: fix inclusion probabilities if necessary,
                            (optional, defaults to False)
        :param random_: random module to use (optional, defaults to the
                        standard random module)
        :param ε: fuzziness parameter when testing for zero in the context
                  of floating point arithmetic (optional, defaults to 1e-9)
        :return: random sample set of size k
        :exception: throws ValueError in case of bad parameters,
                    throws AssertionError in case of algorithmic impossibilities
    """
    # random_ defaults to the random submodule
    if not random_:
        random_ = random

    # special case empty return set
    if k <= 0:
        return set()

    if k > len(items):
        raise ValueError("resulting tuple length exceeds number of elements (k > n)")

    # sort items by weight
    items = sorted(items, key=lambda item: item[0])

    # extract the weights and elements
    weights, elements = list(zip(*items))

    # compute the inclusion probabilities (short: π) of the elements
    scaling_factor = k / sum(weights)
    π = [scaling_factor * weight for weight in weights]

    # in case of best_effort: if a inclusion probability exceeds 1,
    # try to rebalance the probabilities such that:
    # a) no probability exceeds 1,
    # b) the probabilities still sum to k, and
    # c) the probability masses flow from top to bottom:
    #    [0.2, 0.3, 1.5] -> [0.2, 0.8, 1]
    # (remember that π is sorted)
    if best_effort and π[-1] > 1 + ε:
        # probability mass we still we have to distribute
        debt = 0.
        for i in reversed(range(len(π))):
            if π[i] > 1.:
                # an 'offender', take away excess
                debt += π[i] - 1.
                π[i] = 1.
            else:
                # case π[i] < 1, i.e. 'save' element
                # maximum we can transfer from debt to π[i] and still not
                # exceed 1 is computed by the minimum of:
                # a) 1 - π[i], and
                # b) debt
                max_transfer = min(debt, 1. - π[i])
                debt -= max_transfer
                π[i] += max_transfer
        assert debt < ε, "best effort rebalancing failed (impossible)"

    # make sure we are talking about probabilities
    if any(not (0 - ε <= π_i <= 1 + ε) for π_i in π):
        raise ValueError("inclusion probabilities not satisfiable: {}" \
                         .format(list(zip(π, elements))))

    # special case equal probabilities
    # (up to fuzziness parameter, remember that π is sorted)
    if π[-1] < π[0] + ε:
        return set(random_.sample(elements, k))

    # compute the two possible lambda values, see formula 7 on page 75
    # (remember that π is sorted)
    λ1 = π[0] * len(π) / k
    λ2 = (1 - π[-1]) * len(π) / (len(π) - k)
    λ = min(λ1, λ2)

    # there are two cases now, see also page 69
    # CASE 1
    # with probability λ we are in the equal probability case
    # where all elements have the same inclusion probability
    if random_.random() < λ:
        return set(random_.sample(elements, k))

    # CASE 2:
    # with probability 1-λ we are in the case of a new sample without
    # replacement problem which is strictly simpler,
    # it has the following new probabilities (see page 75, π^{(2)}):
    new_π = [
        (π_i - λ * k / len(π))
        /
        (1 - λ)
        for π_i in π
    ]
    new_items = list(zip(new_π, elements))

    # the first few probabilities might be 0, remove them
    # NOTE: we make sure that floating point issues do not arise
    #       by using the fuzziness parameter
    while new_items and new_items[0][0] < ε:
        new_items = new_items[1:]

    # the last few probabilities might be 1, remove them and mark them as selected
    # NOTE: we make sure that floating point issues do not arise
    #       by using the fuzziness parameter
    selected_elements = set()
    while new_items and new_items[-1][0] > 1 - ε:
        selected_elements.add(new_items[-1][1])
        new_items = new_items[:-1]

    # the algorithm reduces the length of the sample problem,
    # it is guaranteed that:
    # if λ = λ1: the first item has probability 0
    # if λ = λ2: the last item has probability 1
    assert len(new_items) < len(items), "problem was not simplified (impossible)"

    # recursive call with the simpler sample problem
    # NOTE: we have to make sure that the selected elements are included
    return sample_no_replacement_exact(
        new_items,
        k - len(selected_elements),
        best_effort=best_effort,
        random_=random_,
        ε=ε
    ) | selected_elements

例:

In : sample_no_replacement_exact([(1,'a'),(2,'b'),(3,'c')],2)
Out: {'b', 'c'}

In : import collections, itertools
In : sample_tester = lambda f: collections.Counter(itertools.chain(*(f() for _ in range(10000))))
In : sample_tester(lambda: sample_no_replacement_exact([(1,'a'),(2,'b'),(3,'c'),(4,'d')],2))
Out: Counter({'a': 2048, 'b': 4051, 'c': 5979, 'd': 7922})

权重总和达到10,因此包含概率计算为:a→20%,b→40%,c→60%,d→80%。 (总和:200%= k。)它有效!

对于有效使用此功能只需要注意一点,很难发现权重的非法输入。一个明显的非法例子是

In: sample_no_replacement_exact([(1,'a'),(2,'b')],2)
ValueError: inclusion probabilities not satisfiable: [(0.6666666666666666, 'a'), (1.3333333333333333, 'b')]

b的出现频率不会是a的两倍,因为两者都必须始终被选中。有更微妙的例子。为了避免生产中的异常,只需使用best_effort = True,它会重新平衡包含概率质量,以便我们始终有效分布。显然,这可能会改变包含概率。


-2
投票

我使用了一个关联映射(权重,对象)。例如:

{
(10,"low"),
(100,"mid"),
(10000,"large")
}

total=10110

查看0和'total'之间的随机数,并迭代键,直到此数字适合给定范围。


66
投票

如果采样是替换的,您可以使用此算法(在Python中实现):

import random

items = [(10, "low"),
         (100, "mid"),
         (890, "large")]

def weighted_sample(items, n):
    total = float(sum(w for w, v in items))
    i = 0
    w, v = items[0]
    while n:
        x = total * (1 - random.random() ** (1.0 / n))
        total -= x
        while x > w:
            x -= w
            i += 1
            w, v = items[i]
        w -= x
        yield v
        n -= 1

这是O(n + m),其中m是项目数。

为什么这样做?它基于以下算法:

def n_random_numbers_decreasing(v, n):
    """Like reversed(sorted(v * random() for i in range(n))),
    but faster because we avoid sorting."""
    while n:
        v *= random.random() ** (1.0 / n)
        yield v
        n -= 1

函数weighted_sample就是这个算法与items列表的步行融合,以挑选出那些随机数选择的项目。

这又起作用,因为n个随机数0..v将全部碰巧小于z的概率是P =(z / v)n。求解z,得到z = vP1 / n。用随机数代替P选择正确分布的最大数;我们可以重复这个过程来选择所有其他数字。

如果采样没有替换,您可以将所有项目放入二进制堆中,其中每个节点都缓存该子堆中所有项目的权重总和。构建堆是O(m)。从堆中选择一个关于权重的随机项是O(log m)。删除该项目并更新缓存的总计也是O(log m)。所以你可以在O(m + n log m)时间内选择n项。

(注意:“权重”在这里意味着每次选择一个元素时,剩余的可能性都会按其权重成比例选择。这并不意味着元素出现在输出中,其可能性与其权重成比例。)

这是一个实现,充分评论:

import random

class Node:
    # Each node in the heap has a weight, value, and total weight.
    # The total weight, self.tw, is self.w plus the weight of any children.
    __slots__ = ['w', 'v', 'tw']
    def __init__(self, w, v, tw):
        self.w, self.v, self.tw = w, v, tw

def rws_heap(items):
    # h is the heap. It's like a binary tree that lives in an array.
    # It has a Node for each pair in `items`. h[1] is the root. Each
    # other Node h[i] has a parent at h[i>>1]. Each node has up to 2
    # children, h[i<<1] and h[(i<<1)+1].  To get this nice simple
    # arithmetic, we have to leave h[0] vacant.
    h = [None]                          # leave h[0] vacant
    for w, v in items:
        h.append(Node(w, v, w))
    for i in range(len(h) - 1, 1, -1):  # total up the tws
        h[i>>1].tw += h[i].tw           # add h[i]'s total to its parent
    return h

def rws_heap_pop(h):
    gas = h[1].tw * random.random()     # start with a random amount of gas

    i = 1                     # start driving at the root
    while gas >= h[i].w:      # while we have enough gas to get past node i:
        gas -= h[i].w         #   drive past node i
        i <<= 1               #   move to first child
        if gas >= h[i].tw:    #   if we have enough gas:
            gas -= h[i].tw    #     drive past first child and descendants
            i += 1            #     move to second child
    w = h[i].w                # out of gas! h[i] is the selected node.
    v = h[i].v

    h[i].w = 0                # make sure this node isn't chosen again
    while i:                  # fix up total weights
        h[i].tw -= w
        i >>= 1
    return v

def random_weighted_sample_no_replacement(items, n):
    heap = rws_heap(items)              # just make a heap...
    for i in range(n):
        yield rws_heap_pop(heap)        # and pop n items off it.

41
投票

如果采样是替换,请使用roulette-wheel selection技术(通常用于遗传算法):

  1. 对权重进行排序
  2. 计算累积权重
  3. [0,1]*totalWeight中选择一个随机数
  4. 找到此数字落入的时间间隔
  5. 选择具有相应间隔的元素
  6. 重复k

如果采样没有替换,您可以通过在每次迭代后从列表中删除所选元素来调整上述技术,然后重新标准化权重,使其总和为1(有效概率分布函数)


3
投票

我在Ruby中做过这个

https://github.com/fl00r/pickup

require 'pickup'
pond = {
  "selmon"  => 1,
  "carp" => 4,
  "crucian"  => 3,
  "herring" => 6,
  "sturgeon" => 8,
  "gudgeon" => 10,
  "minnow" => 20
}
pickup = Pickup.new(pond, uniq: true)
pickup.pick(3)
#=> [ "gudgeon", "herring", "minnow" ]
pickup.pick
#=> "herring"
pickup.pick
#=> "gudgeon"
pickup.pick
#=> "sturgeon"

1
投票

如果要生成具有替换的大型随机整数数组,则可以使用分段线性插值。例如,使用NumPy / SciPy:

import numpy
import scipy.interpolate

def weighted_randint(weights, size=None):
    """Given an n-element vector of weights, randomly sample
    integers up to n with probabilities proportional to weights"""
    n = weights.size
    # normalize so that the weights sum to unity
    weights = weights / numpy.linalg.norm(weights, 1)
    # cumulative sum of weights
    cumulative_weights = weights.cumsum()
    # piecewise-linear interpolating function whose domain is
    # the unit interval and whose range is the integers up to n
    f = scipy.interpolate.interp1d(
            numpy.hstack((0.0, weights)),
            numpy.arange(n + 1), kind='linear')
    return f(numpy.random.random(size=size)).astype(int)

如果您想在不更换的情况下进行采样,则无效。


1
投票

这是来自geodns的Go实现:

package foo

import (
    "log"
    "math/rand"
)

type server struct {
    Weight int
    data   interface{}
}

func foo(servers []server) {
    // servers list is already sorted by the Weight attribute

    // number of items to pick
    max := 4

    result := make([]server, max)

    sum := 0
    for _, r := range servers {
        sum += r.Weight
    }

    for si := 0; si < max; si++ {
        n := rand.Intn(sum + 1)
        s := 0

        for i := range servers {
            s += int(servers[i].Weight)
            if s >= n {
                log.Println("Picked record", i, servers[i])
                sum -= servers[i].Weight
                result[si] = servers[i]

                // remove the server from the list
                servers = append(servers[:i], servers[i+1:]...)
                break
            }
        }
    }

    return result
}

1
投票

如果要从加权集中选择x个元素而不进行替换,则选择的元素的概率与其权重成比例:

import random

def weighted_choose_subset(weighted_set, count):
    """Return a random sample of count elements from a weighted set.

    weighted_set should be a sequence of tuples of the form 
    (item, weight), for example:  [('a', 1), ('b', 2), ('c', 3)]

    Each element from weighted_set shows up at most once in the
    result, and the relative likelihood of two particular elements
    showing up is equal to the ratio of their weights.

    This works as follows:

    1.) Line up the items along the number line from [0, the sum
    of all weights) such that each item occupies a segment of
    length equal to its weight.

    2.) Randomly pick a number "start" in the range [0, total
    weight / count).

    3.) Find all the points "start + n/count" (for all integers n
    such that the point is within our segments) and yield the set
    containing the items marked by those points.

    Note that this implementation may not return each possible
    subset.  For example, with the input ([('a': 1), ('b': 1),
    ('c': 1), ('d': 1)], 2), it may only produce the sets ['a',
    'c'] and ['b', 'd'], but it will do so such that the weights
    are respected.

    This implementation only works for nonnegative integral
    weights.  The highest weight in the input set must be less
    than the total weight divided by the count; otherwise it would
    be impossible to respect the weights while never returning
    that element more than once per invocation.
    """
    if count == 0:
        return []

    total_weight = 0
    max_weight = 0
    borders = []
    for item, weight in weighted_set:
        if weight < 0:
            raise RuntimeError("All weights must be positive integers")
        # Scale up weights so dividing total_weight / count doesn't truncate:
        weight *= count
        total_weight += weight
        borders.append(total_weight)
        max_weight = max(max_weight, weight)

    step = int(total_weight / count)

    if max_weight > step:
        raise RuntimeError(
            "Each weight must be less than total weight / count")

    next_stop = random.randint(0, step - 1)

    results = []
    current = 0
    for i in range(count):
        while borders[current] <= next_stop:
            current += 1
        results.append(weighted_set[current][0])
        next_stop += step

    return results

0
投票

在你所链接的问题中,凯尔的解决方案可以解决一个微不足道的问题。扫描列表并总计总重量。那么选择元素的概率应该是:

1 - (1 - (#needed /(左侧重量)))/(重量为n)。访问节点后,从总数中减去它的权重。此外,如果您需要n并且左边有n,则必须明确停止。

您可以检查重量为1的所有内容,这简化了凯尔的解决方案。

编辑:(不得不重新考虑可能性的两倍)


0
投票

这个与O(n)完全相同,没有多余的内存使用量。我相信这是一个易于移植到任何语言的聪明而有效的解决方案。前两行只是为了在Drupal中填充样本数据。

function getNrandomGuysWithWeight($numitems){
  $q = db_query('SELECT id, weight FROM theTableWithTheData');
  $q = $q->fetchAll();

  $accum = 0;
  foreach($q as $r){
    $accum += $r->weight;
    $r->weight = $accum;
  }

  $out = array();

  while(count($out) < $numitems && count($q)){
    $n = rand(0,$accum);
    $lessaccum = NULL;
    $prevaccum = 0;
    $idxrm = 0;
    foreach($q as $i=>$r){
      if(($lessaccum == NULL) && ($n <= $r->weight)){
        $out[] = $r->id;
        $lessaccum = $r->weight- $prevaccum;
        $accum -= $lessaccum;
        $idxrm = $i;
      }else if($lessaccum){
        $r->weight -= $lessaccum;
      }
      $prevaccum = $r->weight;
    }
    unset($q[$idxrm]);
  }
  return $out;
}
© www.soinside.com 2019 - 2024. All rights reserved.