9

weighted_sample与えられた重みのリストに対してランダムなインデックスを1つだけ返さない関数の合理的な定義を探しています(これは次のようになります)

def weighted_choice(weights, random=random):
    """ Given a list of weights [w_0, w_1, ..., w_n-1],
        return an index i in range(n) with probability proportional to w_i. """
    rnd = random.random() * sum(weights)
    for i, w in enumerate(weights):
        if w<0:
            raise ValueError("Negative weight encountered.")
        rnd -= w
        if rnd < 0:
            return i
    raise ValueError("Sum of weights is not positive")

一定の重みを持つカテゴリ分布を与えるため)が、と比較して動作するのと同じように、置換なしkのそれらのランダムサンプル。random.samplerandom.choice

weighted_choiceと書くことができるのと同じように

lambda weights: random.choice([val for val, cnt in enumerate(weights)
    for i in range(cnt)])

weighted_sample次のように書くことができます

lambda weights, k: random.sample([val for val, cnt in enumerate(weights)
    for i in range(cnt)], k)

しかし、重みを(おそらく巨大な)リストに解明する必要がないソリューションが必要です。

編集:インデックスのシーケンスの代わりに(引数と同じ形式で)頻度のヒストグラム/リストを返す素晴らしいアルゴリズムがある場合weights、それも非常に便利です。

4

6 に答える 6

8

あなたのコードから:..

weight_sample_indexes = lambda weights, k: random.sample([val 
        for val, cnt in enumerate(weights) for i in range(cnt)], k)

..重みは正の整数であり、「置換なし」とは、解かれたシーケンスの置換なしを意味すると思います。

これがrandom.sampleとO(log n)に基づく解決策です__getitem__

import bisect
import random
from collections import Counter, Sequence

def weighted_sample(population, weights, k):
    return random.sample(WeightedPopulation(population, weights), k)

class WeightedPopulation(Sequence):
    def __init__(self, population, weights):
        assert len(population) == len(weights) > 0
        self.population = population
        self.cumweights = []
        cumsum = 0 # compute cumulative weight
        for w in weights:
            cumsum += w   
            self.cumweights.append(cumsum)  
    def __len__(self):
        return self.cumweights[-1]
    def __getitem__(self, i):
        if not 0 <= i < len(self):
            raise IndexError(i)
        return self.population[bisect.bisect(self.cumweights, i)]

total = Counter()
for _ in range(1000):
    sample = weighted_sample("abc", [1,10,2], 5)
    total.update(sample)
print(sample)
print("Frequences %s" % (dict(Counter(sample)),))

# Check that values are sane
print("Total " + ', '.join("%s: %.0f" % (val, count * 1.0 / min(total.values()))
                           for val, count in total.most_common()))

出力

['b', 'b', 'b', 'c', 'c']
Frequences {'c': 2, 'b': 3}
Total b: 10, c: 2, a: 1
于 2012-10-24T15:11:12.743 に答える
3

作成したいのは、不均一なランダム分布です。これを行う悪い方法の1つは、重みに比例した出力シンボルを持つ巨大な配列を作成することです。したがって、aがbの5倍の可能性がある場合は、bの5倍のaを持つ配列を作成します。これは、重みが互いに倍数でさえある単純な分布ではうまく機能します。99.99%a、および.01%bが必要な場合はどうなりますか。10000スロットを作成する必要があります。

より良い方法があります。N個のシンボルを持つすべての不均一な分布は、一連のn-1個のバイナリ分布に分解できます。各分布は同じように発生する可能性があります。

したがって、このような分解があった場合は、最初に1からN-1までの均一な乱数を生成することにより、ランダムに2項分布を選択します。

u32 dist = randInRange( 1, N-1 ); // generate a random number from 1 to N;

そして、選択された分布が2つのシンボルaとbを持つバイナリ分布であり、確率がaの場合は0-alpha、bの場合はalpha-1であるとします。

float f = randomFloat();
return ( f > alpha ) ? b : a;

不均一なランダム分布を分解する方法はもう少し複雑です。基本的に、N-1'バケット'を作成します。確率が最も低いシンボルと確率が最も高いシンボルを選択し、それらの重みを最初の2項分布に比例配分します。次に、最小のシンボルを削除し、この二項分布の作成に使用された大きい方の重みの量を削除します。シンボルがなくなるまでこのプロセスを繰り返します。

このソリューションを使用したい場合は、このためのc++コードを投稿できます。

于 2012-10-25T15:29:54.187 に答える
0

操作するための適切なデータ構造を構築する場合random.sample()、新しい関数を定義する必要はまったくありません。を使用するだけrandom.sample()です。

ここで、__getitem__()はO(n)です。ここで、nは、重みを持つさまざまなアイテムの数です。ただし、メモリはコンパクトで、(weight, value)ペアのみを保存する必要があります。私は実際に同様のクラスを使用しました、そしてそれは私の目的のために十分に速かったです。この実装は整数の重みを想定していることに注意してください。

class SparseDistribution(object):
    _cached_length = None

    def __init__(self, weighted_items):
        # weighted items are (weight, value) pairs
        self._weighted_items = []
        for item in weighted_items:
            self.append(item)

    def append(self, weighted_item):
        self._weighted_items.append(weighted_item)
        self.__dict__.pop("_cached_length", None)

    def __len__(self):
        if self._cached_length is None:
            length = 0
            for w, v in self._weighted_items:
                length += w
            self._cached_length = length
        return self._cached_length

    def __getitem__(self, index):
        if index < 0 or index >= len(self):
            raise IndexError(index)
        for w, v in self._weighted_items:
            if index < w:
                return v
        raise Exception("Shouldn't have happened")

    def __iter__(self):
        for w, v in self._weighted_items:
            for _ in xrange(w):
                yield v

次に、それを使用できます。

import random

d = SparseDistribution([(5, "a"), (2, "b")])
d.append((3, "c"))

for num in (3, 5, 10, 11):
    try:
        print random.sample(d, num)
    except Exception as e:
        print "{}({!r})".format(type(e).__name__, str(e))

その結果:

['a', 'a', 'b']
['b', 'a', 'c', 'a', 'b']
['a', 'c', 'a', 'c', 'a', 'b', 'a', 'a', 'b', 'c']
ValueError('sample larger than population')
于 2012-10-24T13:37:57.247 に答える
0

現在、結果のヒストグラムに主に関心があるので、次の解決策を考えました(残念ながら、、、のnumpy.random.hypergeometric境界の場合は動作が悪いためngood < 1、これらの場合は個別にチェックする必要があります)。nbad < 1nsample < 1

def weighted_sample_histogram(frequencies, k, random=numpy.random):
    """ Given a sequence of absolute frequencies [w_0, w_1, ..., w_n-1],
    return a generator [s_0, s_1, ..., s_n-1] where the number s_i gives the
    absolute frequency of drawing the index i from an urn in which that index is
    represented by w_i balls, when drawing k balls without replacement. """
    W = sum(frequencies)
    if k > W:
        raise ValueError("Sum of absolute frequencies less than number of samples")
    for frequency in frequencies:
        if k < 1 or frequency < 1:
            yield 0
        else:
            W -= frequency
            if W < 1:
                good = k
            else:
                good = random.hypergeometric(frequency, W, k)
            k -= good
            yield good
    raise StopIteration

これを改善する方法や、なぜこれが良い解決策ではないのかについて、私は喜んでコメントを取ります。

これ(およびその他の重み付けされたランダムなもの)を実装するPythonパッケージは、現在http://github.com/Anaphory/weighted_choiceにあります。

于 2012-10-25T14:43:23.877 に答える
0

別の解決策

from typing import List, Any
import numpy as np

def weighted_sample(choices: List[Any], probs: List[float]):
    """
    Sample from `choices` with probability according to `probs`
    """
    probs = np.concatenate(([0], np.cumsum(probs)))
    r = random.random()
    for j in range(len(choices) + 1):
        if probs[j] < r <= probs[j + 1]:
            return choices[j]

例:

aa = [0,1,2,3]
probs = [0.1, 0.8, 0.0, 0.1]
np.average([weighted_sample(aa, probs) for _ in range(10000)])

Out: 1.0993
于 2019-06-20T00:25:10.590 に答える
-3

サンプルはかなり速いです。したがって、処理するメガバイトがたくさんない限り、sample()で問題ありません。

私のマシンでは、長さ100の10000000から1000サンプルを生成するのに1.655秒かかりました。また、10000000要素から長さ100の100000サンプルをトラバースするのに12.98秒かかりました。

from random import sample,random
from time import time

def generate(n1,n2,n3):
    w = [random() for x in range(n1)]

    print len(w)

    samples = list()
    for i in range(0,n2):
        s = sample(w,n3)
        samples.append(s)

    return samples

start = time()
size_set = 10**7
num_samples = 10**5
length_sample = 100
samples = generate(size_set,num_samples,length_sample)
end = time()

allsum=0
for row in samples:
    sum = reduce(lambda x, y: x+y,row)
    allsum+=sum

print 'sum of all elements',allsum

print '%f seconds for %i samples of %i length %i'%((end-start),size_set,num_sam\
ples,length_sample)
于 2012-10-24T11:25:50.910 に答える