问题 在不加载内存的情况下随机播放大量项目


我有一个约20亿行文本的文件(~200gig)。我想生成一个包含相同文本行的新文件,但是按行随机洗牌。我无法将所有数据保存在内存中。有没有一种好方法在python /命令行中执行此操作需要一段合理的时间(几天)?

我以为我可以触摸50个空文件。流过20亿行文件,并将每行随机分配到50个空文件中的一个。然后cat 50个文件。这种方法会有任何重大的系统性偏见吗?


12194
2018-06-30 14:22


起源

请编辑您的答案以添加此评论,这将更容易遵循。 - fxm
你的50个文件方法的问题在于你在整个文件中首先遇到的行将在每个50的开头结束。这意味着shuffle将不是完美的 - 它不会均匀分布元素而不管它们是什么起始位置。 - Matthew Franglen
我能想到的最好的事情是将整个事情分解为单行,然后执行任意合并排序以重新组合到文件,但这会破坏磁盘inode限制并且会涉及大量磁盘IO(可能会破坏)时间限制)。 - Matthew Franglen
我会更进一步,随机化你在中间文件中读取的顺序,但这听起来像是一个相当不错的方法。 - g.d.d.c
你能在记忆中保持范围(2 000 000 000)吗?如果是这样,我会将其洗牌(例如,通过随机模块),然后循环遍历此新列表以逐行打印。对于这一步骤,直接访问每一行是合适的。也许之前将大文件切成小文件? - user189


答案:


如果你可以为这个程序保留16 GB的内存,我写了一个名为的程序 sample 通过读取它们的字节偏移,改变偏移量,然后通过搜索文件到打乱的偏移量来打印输出,从而对文件的行进行混洗。它为每个64位偏移使用8个字节,因此对于20亿行输入使用16个字节。

它不会很快,但在具有足够内存的系统上, sample 将洗牌大到足以导致GNU的文件 shuf 失败。此外,它使用 MMAP 尝试最小化第二次通过文件的I / O开销的例程。它还有一些其他选择;看到 --help 更多细节。

默认情况下,此程序将无需替换即可进行采样,并通过单行进行随机播放。如果您想要替换,或者您的输入是FASTA,FASTQ或其他多行格式,您可以添加一些选项来调整采样方式。 (或者你可以应用另一种方法,我在下面的Perl要点中链接,但是 sample 解决这些案件。)

如果你的FASTA序列在每两行上,也就是说,它们在一行上的序列标题和下一行的序列数据之间交替,你仍然可以随机播放 sample并且只有一半的内存,因为你只是改变一半的偏移量。见 --lines-per-offset 选项;你指定了 2例如,要改变成对的线条。

对于FASTQ文件,它们的记录每四行分割一次。你可以指定 --lines-per-offset=4 使用调整单行文件所需的四分之一内存来重新排列FASTQ文件。

或者,我有一个要点 这里 用Perl编写,它将在不考虑序列中的行数的情况下对FASTA文件中的序列进行采样而无需替换。请注意,这与对整个文件进行混洗并不完全相同,但您可以将此作为起点,因为它会收集偏移量。您不必对某些偏移进行采样,而是删除对排序后的索引进行排序的第47行,然后使用文件搜索操作直接使用混洗索引列表来读取文件。

同样,它不会很快,因为你正在乱序地跳过一个非常大的文件,但是存储偏移比存储整行要便宜得多,并且添加mmap例程可以帮助实现一系列随机的一点点访问操作。如果您正在使用FASTA,那么您的存储偏移量仍然会减少,因此您的内存使用量(除了任何相对无关紧要的容器和程序开销)应该最多为8 GB - 并且可能更少,具体取决于其结构。


6
2018-06-30 14:45



嘿Alex。我想我在BioStar上看过你的帖子。我希望洗牌~20亿读(fasta,而不是fastq)。我的服务器有64gig的ram,所以我可以保留16gb。感谢您的采样软件。 - daemonk
请参阅我的回答编辑,其中涉及改组FASTA。 - Alex Reynolds
我对您的水库采样实施有疑问。使用算法R时,如果样本大小与总总体大小相同,则对所有内容进行采样,但不会对样本的顺序进行混洗。为了实现水库采样能够改变整个文件(从而随机抽样整个文件),它一定不能使用算法R的初始填充程序吗? - CMCDragonkai


怎么样:

import mmap
from random import shuffle

def find_lines(data):
    for i, char in enumerate(data):
        if char == '\n':
            yield i 

def shuffle_file(in_file, out_file):
    with open(in_file) as f:
        data = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
        start = 0
        lines = []
        for end in find_lines(data):
            lines.append((start, end))
            start = end + 1
        shuffle(lines)

        with open(out_file, 'w') as out:
            for start, end in lines:
                out.write(data[start:end+1])

if __name__ == "__main__":
    shuffle_file('data', 'result')

此解决方案应该只存储文件中行的所有文件偏移量,即每行2个字,加上容器开销。


5
2018-06-30 15:05



我想知道是否有可能在内存映射数据结构(如numpy数组)上使用Fisher Yates来重新调整内存不足的文件?只需直接在文件上进行交换即可。操作系统应该处理分页和分页内存。假设y;您不想破坏性地编辑输入文件。可以在这里使用变体: en.wikipedia.org/wiki/... - CMCDragonkai


你可以检查我的 HugeFileProcessor 工具。它与@ Alex-Reynolds的相似 sample,但应该明显更快,因为没有寻求。

这是关于改组实施的细节。它需要指定 BATCHSIZE  - 写入输出时保留在RAM中的行数。越多越好(除非你没有RAM),因为总的洗牌时间会是 (sourceFile中的行数)/ batchSize *(完全读取sourceFile的时间)。请注意该计划 洗牌整个文件,而不是按批次。

算法如下。

  1. 计算行数 源文件。这可以通过逐行读取整个文件来完成。 (参见一些比较 这里。)这也可以测量一次读取整个文件所需的时间。因此,我们可以估计完成随机播放所需的次数,因为它需要 Ceil(linesCount / batchSize) 完整的文件读取。

  2. 正如我们现在知道的那样 linesCount,我们可以创建一个索引数组 linesCount 尺寸和洗牌使用 费雪耶茨 (称为 orderArray 在代码中)。这将为我们提供一个订单,我们希望在洗牌文件中包含行。请注意,这是整个文件的全局顺序,而不是每批或块或其他内容。

  3. 现在是实际的代码。我们需要从中获取所有线路 源文件 在我们刚刚计算的顺序中,但是我们无法在内存中读取整个文件。所以我们只是拆分任务。

    • 我们会经历 源文件 读取所有行并在存储器中仅存储首先存在的行 BATCHSIZE 的 orderArray。当我们获得所有这些行时,我们可以将它们写入 不过outFile 按要求的顺序,这是一个 BATCHSIZE/linesCount 完成的工作。
    • 接下来我们将一遍又一遍地重复整个过程 orderArray 和阅读 源文件 每个部分从头到尾。最终整个 orderArray 处理完毕,我们完成了。

它为什么有效?

因为我们所做的只是从头到尾阅读源文件。没有寻求前进/后退,这就是硬盘驱动器所喜欢的。文件根据内部HDD缓冲区,FS块,CPU cahce等以块的形式读取,并且所有内容都按顺序读取。

一些数字

在我的机器上(Core i5,16GB RAM,Win8.1,HDD Toshiba DT01ACA200 2TB,NTFS)我能够在大约5小时内使用132 GB(84 000 000行)的文件进行洗牌 BATCHSIZE 3 500 000.随着 BATCHSIZE 2 000 000花了大约8个小时。读取速度约为每秒118000行。


2
2017-08-07 10:35



如果它不依赖于Windows,这个工具会很有趣。 - Bob
它不依赖于Windows,它依赖于.Net。所以只需在Linux上使用Mono。我已经在一些Mono版本下进行了测试,它运行良好。其实我在Mono下使用它。 - Mikhail


我认为在你的情况下最简单的是做一个递归的shuffle和split - shuffle - merge。 您定义了两个数字:要分割一个文件的文件数: N(通常在32到256之间),以及你可以直接在内存中随机播放的大小 M (通常约128 Mo)。然后你有伪代码:

def big_shuffle(file):
    if size_of(file) < M :
        memory_shuffle(file)
    else:
        create N files
        for line in file:
            write_randomly_to_one_of_the_N_files
        for sub_file in (N_files):
            big_shuffle(file)
        merge_the_N_files_one_line_each

随着每个子文件被洗牌,你应该没有偏见。

它将远远低于Alex Reynolds解决方案(因为很多磁盘io),但你唯一的限制是磁盘空间。


1
2018-06-30 15:05





你可以创建一个给出排列的迭代器。您将读取的数量按其提供的数量抵消。因为迭代器给出了排列,所以你永远不会读取相同的数据两次。

一组N个元素的所有排列都可以通过转置生成,转换是交换第0和第i个元素的排列(假设从0开始索引)并将所有其他元素保留在它们的位置。因此,您可以通过组合一些随机选择的转置来进行随机排列。这是一个用Python编写的示例:

import random

class Transposer:
    def __init__(self,i):
        """
        (Indexes start at 0)
        Swap 0th index and ith index, otherwise identity mapping.
        """
        self.i = i
    def map(self,x):
        if x == 0:
            return self.i
        if x == self.i:
            return 0
        return x

class RandomPermuter:
    def __init__(self,n_gens,n):
        """
        Picks n_gens integers in [0,n) to make transposers that, when composed,
        form a permutation of a set of n elements. Of course if there are an even number of drawn
        integers that are equal, they cancel each other out. We could keep
        drawing numbers until we have n_gens unique numbers... but we don't for
        this demo.
        """
        gen_is = [random.randint(0,n-1) for _ in range(n_gens)]
        self.trans = [Transposer(g) for g in gen_is]
    def map(self,x):
        for t in self.trans:
            x = t.map(x)
        return x

rp = RandomPermuter(10,10)

# Use these numbers to seek into a file
print(*[rp.map(x) for x in range(10)])

0
2017-08-26 00:36