45

random.samplePythonをジェネレーターオブジェクトで動作させる方法があるかどうか知っていますか。非常に大きなテキスト コーパスからランダムなサンプルを取得しようとしています。問題はrandom.sample()、次のエラーが発生することです。

TypeError: object of type 'generator' has no len()

から何かを使ってこれを行う方法があるのではないかと考えていましitertoolsたが、少し検索しても何も見つかりませんでした。

やや構成された例:

import random
def list_item(ls):
    for item in ls:
        yield item

random.sample( list_item(range(100)), 20 )


アップデート


さんのリクエストに応じMartinPietersて、現在提案されている 3 つの方法のタイミングを調整しました。結果は次のとおりです。

Sampling 1000 from 10000
Using iterSample 0.0163 s
Using sample_from_iterable 0.0098 s
Using iter_sample_fast 0.0148 s

Sampling 10000 from 100000
Using iterSample 0.1786 s
Using sample_from_iterable 0.1320 s
Using iter_sample_fast 0.1576 s

Sampling 100000 from 1000000
Using iterSample 3.2740 s
Using sample_from_iterable 1.9860 s
Using iter_sample_fast 1.4586 s

Sampling 200000 from 1000000
Using iterSample 7.6115 s
Using sample_from_iterable 3.0663 s
Using iter_sample_fast 1.4101 s

Sampling 500000 from 1000000
Using iterSample 39.2595 s
Using sample_from_iterable 4.9994 s
Using iter_sample_fast 1.2178 s

Sampling 2000000 from 5000000
Using iterSample 798.8016 s
Using sample_from_iterable 28.6618 s
Using iter_sample_fast 6.6482 s

したがって、array.insertサンプル サイズが大きい場合、 には重大な欠点があることがわかります。メソッドの時間を計るために使用したコード

from heapq import nlargest
import random
import timeit


def iterSample(iterable, samplesize):
    results = []
    for i, v in enumerate(iterable):
        r = random.randint(0, i)
        if r < samplesize:
            if i < samplesize:
                results.insert(r, v) # add first samplesize items in random order
            else:
                results[r] = v # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")

    return results

def sample_from_iterable(iterable, samplesize):
    return (x for _, x in nlargest(samplesize, ((random.random(), x) for x in iterable)))

def iter_sample_fast(iterable, samplesize):
    results = []
    iterator = iter(iterable)
    # Fill in the first samplesize elements:
    for _ in xrange(samplesize):
        results.append(iterator.next())
    random.shuffle(results)  # Randomize their positions
    for i, v in enumerate(iterator, samplesize):
        r = random.randint(0, i)
        if r < samplesize:
            results[r] = v  # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")
    return results

if __name__ == '__main__':
    pop_sizes = [int(10e+3),int(10e+4),int(10e+5),int(10e+5),int(10e+5),int(10e+5)*5]
    k_sizes = [int(10e+2),int(10e+3),int(10e+4),int(10e+4)*2,int(10e+4)*5,int(10e+5)*2]

    for pop_size, k_size in zip(pop_sizes, k_sizes):
        pop = xrange(pop_size)
        k = k_size
        t1 = timeit.Timer(stmt='iterSample(pop, %i)'%(k_size), setup='from __main__ import iterSample,pop')
        t2 = timeit.Timer(stmt='sample_from_iterable(pop, %i)'%(k_size), setup='from __main__ import sample_from_iterable,pop')
        t3 = timeit.Timer(stmt='iter_sample_fast(pop, %i)'%(k_size), setup='from __main__ import iter_sample_fast,pop')

        print 'Sampling', k, 'from', pop_size
        print 'Using iterSample', '%1.4f s'%(t1.timeit(number=100) / 100.0)
        print 'Using sample_from_iterable', '%1.4f s'%(t2.timeit(number=100) / 100.0)
        print 'Using iter_sample_fast', '%1.4f s'%(t3.timeit(number=100) / 100.0)
        print ''

また、テストを実行して、すべてのメソッドが実際にジェネレーターの偏りのないサンプルを取得することを確認しました。したがって、すべての方法について、時間1000から要素をサンプリング10000 100000し、母集団内の各項目の平均発生頻度を計算しました。これは~.1、3 つの方法すべてで予想されるとおりです。

4

8 に答える 8

27

Martijn Pieters の答えは正しいですが、ループで使用すると 2 次複雑になる可能性があるsamplesizeため、大きくなると遅くなります。list.insert

私の意見では、パフォーマンスを向上させながら均一性を維持する代替手段を次に示します。

def iter_sample_fast(iterable, samplesize):
    results = []
    iterator = iter(iterable)
    # Fill in the first samplesize elements:
    try:
        for _ in xrange(samplesize):
            results.append(iterator.next())
    except StopIteration:
        raise ValueError("Sample larger than population.")
    random.shuffle(results)  # Randomize their positions
    for i, v in enumerate(iterator, samplesize):
        r = random.randint(0, i)
        if r < samplesize:
            results[r] = v  # at a decreasing rate, replace random items
    return results

samplesize上記の値では、違いがゆっくりと現れ始めます10000。との通話時間(1000000, 100000):

  • iterSample: 5.05 秒
  • iter_sample_fast: 2.64 秒
于 2012-09-25T12:56:13.753 に答える
21

できません。

2 つのオプションがあります。ジェネレーター全体をリストに読み取ってから、そのリストからサンプルを抽出するか、ジェネレーターを 1 つずつ読み取り、そこからサンプルを選択する方法を使用します。

import random

def iterSample(iterable, samplesize):
    results = []

    for i, v in enumerate(iterable):
        r = random.randint(0, i)
        if r < samplesize:
            if i < samplesize:
                results.insert(r, v) # add first samplesize items in random order
            else:
                results[r] = v # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")

    return results

このメソッドは、これまでの iterable 内のアイテムの数に基づいて、次のアイテムがサンプルの一部である可能性を調整します。以上のsamplesizeアイテムをメモリに保持する必要はありません。

解決策は私のものではありません。SO に関する別の回答の一部として提供されました。

于 2012-09-25T10:53:34.763 に答える
8

念のため、O( n lg k ) 時間で生成されたn 個のアイテムからk 個の要素を置換せずにサンプリングするワンライナーを次に示します。

from heapq import nlargest

def sample_from_iterable(it, k):
    return (x for _, x in nlargest(k, ((random.random(), x) for x in it)))
于 2012-09-25T12:24:42.197 に答える
0

イテレータ内のアイテムの数が (他の場所でアイテムを数えることによって) わかっている場合、別のアプローチは次のとおりです。

def iter_sample(iterable, iterlen, samplesize):
    if iterlen < samplesize:
        raise ValueError("Sample larger than population.")
    indexes = set()
    while len(indexes) < samplesize:
        indexes.add(random.randint(0,iterlen))
    indexesiter = iter(sorted(indexes))
    current = indexesiter.next()
    ret = []
    for i, item in enumerate(iterable):
        if i == current:
            ret.append(item)
            try:
                current = indexesiter.next()
            except StopIteration:
                break
    random.shuffle(ret)
    return ret

特に iterlen に比べて sampsize が小さい場合は、これがより高速であることがわかります。しかし、全体、または全体に近いサンプルが求められる場合、問題があります。

iter_sample (iterlen=10000, samplesize=100) time: (1, 'ms') iter_sample_fast (iterlen=10000, samplesize=100) time: (15, 'ms')

iter_sample (iterlen=1000000, samplesize=100) time: (65, 'ms') iter_sample_fast (iterlen=1000000, samplesize=100) time: (1477, 'ms')

iter_sample (iterlen=1000000, samplesize=1000) time: (64, 'ms') iter_sample_fast (iterlen=1000000, samplesize=1000) time: (1459, 'ms')

iter_sample (iterlen=1000000, samplesize=10000) time: (86, 'ms') iter_sample_fast (iterlen=1000000, samplesize=10000) time: (1480, 'ms')

iter_sample (iterlen=1000000, samplesize=100000) time: (388, 'ms') iter_sample_fast (iterlen=1000000, samplesize=100000) time: (1521, 'ms')

iter_sample (iterlen=1000000, samplesize=1000000) time: (25359, 'ms') iter_sample_fast (iterlen=1000000, samplesize=1000000) time: (2178, 'ms')

于 2014-09-14T00:21:27.683 に答える
0

これは、セットをアイテムのバケツとして使用する根本的に異なるバリエーションです。バケツにアイテムを準備することから始めpool、次にバケツからサンプルを生成し、イテレータからそれらを置き換え、最後にバケツに残っているものを排出します。

HashWrapperからハッシュ不可能な型を隠すのに役立ちますset

class HashWrapper(tuple):
    """Wrap unhashable type."""
    def __hash__(self):
        return id(self)


def randomize_iterator(data: Iterator, pool=100) -> Iterator:
    """
    Randomize an iterator.
    """

    bucket = set()
    iterator = iter(data)

    # Prime the bucket
    for _ in range(pool):
        try:
            bucket.add(HashWrapper(next(iterator)))
        except StopIteration:
            # We've drained the iterator
            break

    # Start picking from the bucket and replacing new items from the iterator
    for item in iterator:
        sample, = random.sample(bucket, 1)
        yield sample
        bucket.remove(sample)
        bucket.add(HashWrapper(item))

    # Drain the bucket
    yield from random.sample(bucket, len(bucket))
于 2020-06-11T04:44:04.713 に答える
0

ジェネレーターがどれくらいの長さであるか(そして漸近的に均一に分散される)についての考えがある場合、そうでないことが証明されるまでの最速の方法:

def gen_sample(generator_list, sample_size, iterlen):
    num = 0
    inds = numpy.random.random(iterlen) <= (sample_size * 1.0 / iterlen)
    results = []
    iterator = iter(generator_list)
    gotten = 0
    while gotten < sample_size: 
        try:
            b = iterator.next()
            if inds[num]: 
                results.append(b)
                gotten += 1
            num += 1    
        except: 
            num = 0
            iterator = iter(generator_list)
            inds = numpy.random.random(iterlen) <= ((sample_size - gotten) * 1.0 / iterlen)
    return results

これは、小さい iterable と巨大な iterable の両方で最速です (おそらくその間のすべて)。

# Huge
res = gen_sample(xrange(5000000), 200000, 5000000)
timing: 1.22s

# Small
z = gen_sample(xrange(10000), 1000, 10000) 
timing: 0.000441    
于 2014-10-19T12:47:06.783 に答える