77

各行にタイム スタンプを含むテキスト ファイルがあります。私の目標は、時間範囲を見つけることです。すべての時間は順番に並べられているため、最初の行が最も早い時間になり、最後の行が最も遅い時間になります。最初と最後の行だけが必要です。これらの行を Python で取得する最も効率的な方法は何でしょうか?

注: これらのファイルの長さは比較的大きく、それぞれ約 100 万から 200 万行あり、数百のファイルに対してこれを行う必要があります。

4

12 に答える 12

91

ファイルの最初と最後の行の両方を読み取るには...

  • ファイルを開き、...
  • ...組み込みを使用して最初の行を読み取りreadline()、...
  • ... ファイルの最後までシーク (カーソルを移動) ...
  • ... EOL (改行) に遭遇するまで後退して ...
  • ...そこから最後の行を読んでください。
def readlastline(f):
    f.seek(-2, 2)              # Jump to the second last byte.
    while f.read(1) != b"\n":  # Until EOL is found ...
        f.seek(-2, 1)          # ... jump back, over the read byte plus one more.
    return f.read()            # Read all data from this point on.
    
with open(file, "rb") as f:
    first = f.readline()
    last = readlastline(f)

最後から2 番目のバイトに直接ジャンプして、末尾の改行文字が空行を返すのを防ぎます*。

現在のオフセットは、バイトが読み取られるたびに 1 ずつ進められるため、最近読み取られたバイトと次に読み取られるバイトを過ぎて、一度に 2 バイトずつ後退します。

whenceに渡されるパラメーターは、... からの相対位置バイトをシークする必要があることをfseek(offset, whence=0)示します。fseekoffset

print*およびを含むほとんどのアプリケーションのデフォルトの動作として予想されるように、echo書き込まれたすべての行に 1 を追加し、末尾の改行文字がない行には影響しません。


効率

それぞれ100万から200万行で、数百のファイルに対してこれを行う必要があります。

私はこの方法の時間を計り、それを上位の回答と比較しました。

10k iterations processing a file of 6k lines totalling 200kB: 1.62s vs 6.92s.
100 iterations processing a file of 6k lines totalling 1.3GB: 8.93s vs 86.95.

何百万行もあると、差がさらに大きくなります。

タイミングに使用される正確なコード:

with open(file, "rb") as f:
    first = f.readline()     # Read and store the first line.
    for last in f: pass      # Read all lines, keep final value.

修正

それ以降に提起されたコメントや問題に対処するための、より複雑で読みにくいバリエーション。

  • コメントによって発生した空のファイルを解析するときに空の文字列を返します。
  • commentによって発生した区切り文字が見つからない場合は、すべてのコンテンツを返します。
  • コメントによって発生する、テキスト モードをサポートするための相対オフセットは避けてください。
  • コメントで指摘されている UTF16/UTF32 ハック。

マルチバイト区切り文字のサポートも追加しますreadlast(b'X<br>Y', b'<br>', fixed=False)

テキストモードでは非相対オフセットが必要なため、大きなファイルの場合、このバリエーションは非常に遅いことに注意してください。必要に応じて変更するか、まったく使用しないでf.readlines()[-1]ください。テキスト モードで開いたファイルを使用する方がよいでしょう。

#!/bin/python3

from os import SEEK_END

def readlast(f, sep, fixed=True):
    r"""Read the last segment from a file-like object.

    :param f: File to read last line from.
    :type  f: file-like object
    :param sep: Segment separator (delimiter).
    :type  sep: bytes, str
    :param fixed: Treat data in ``f`` as a chain of fixed size blocks.
    :type  fixed: bool
    :returns: Last line of file.
    :rtype: bytes, str
    """
    bs   = len(sep)
    step = bs if fixed else 1
    if not bs:
        raise ValueError("Zero-length separator.")
    try:
        o = f.seek(0, SEEK_END)
        o = f.seek(o-bs-step)    # - Ignore trailing delimiter 'sep'.
        while f.read(bs) != sep: # - Until reaching 'sep': Read sep-sized block
            o = f.seek(o-step)   #  and then seek to the block to read next.
    except (OSError,ValueError): # - Beginning of file reached.
        f.seek(0)
    return f.read()

def test_readlast():
    from io import BytesIO, StringIO
    
    # Text mode.
    f = StringIO("first\nlast\n")
    assert readlast(f, "\n") == "last\n"
    
    # Bytes.
    f = BytesIO(b'first|last')
    assert readlast(f, b'|') == b'last'
    
    # Bytes, UTF-8.
    f = BytesIO("X\nY\n".encode("utf-8"))
    assert readlast(f, b'\n').decode() == "Y\n"
    
    # Bytes, UTF-16.
    f = BytesIO("X\nY\n".encode("utf-16"))
    assert readlast(f, b'\n\x00').decode('utf-16') == "Y\n"
  
    # Bytes, UTF-32.
    f = BytesIO("X\nY\n".encode("utf-32"))
    assert readlast(f, b'\n\x00\x00\x00').decode('utf-32') == "Y\n"
    
    # Multichar delimiter.
    f = StringIO("X<br>Y")
    assert readlast(f, "<br>", fixed=False) == "Y"
    
    # Make sure you use the correct delimiters.
    seps = { 'utf8': b'\n', 'utf16': b'\n\x00', 'utf32': b'\n\x00\x00\x00' }
    assert "\n".encode('utf8' )     == seps['utf8']
    assert "\n".encode('utf16')[2:] == seps['utf16']
    assert "\n".encode('utf32')[4:] == seps['utf32']
    
    # Edge cases.
    edges = (
        # Text , Match
        (""    , ""  ), # Empty file, empty string.
        ("X"   , "X" ), # No delimiter, full content.
        ("\n"  , "\n"),
        ("\n\n", "\n"),
        # UTF16/32 encoded U+270A (b"\n\x00\n'\n\x00"/utf16)
        (b'\n\xe2\x9c\x8a\n'.decode(), b'\xe2\x9c\x8a\n'.decode()),
    )
    for txt, match in edges:
        for enc,sep in seps.items():
            assert readlast(BytesIO(txt.encode(enc)), sep).decode(enc) == match

if __name__ == "__main__":
    import sys
    for path in sys.argv[1:]:
        with open(path) as f:
            print(f.readline()    , end="")
            print(readlast(f,"\n"), end="")
于 2013-09-03T23:29:19.150 に答える
66

ioモジュールのドキュメント

with open(fname, 'rb') as fh:
    first = next(fh).decode()

    fh.seek(-1024, 2)
    last = fh.readlines()[-1].decode()

ここでの変数値は1024です。これは平均文字列長を表します。たとえば、1024のみを選択します。平均線長の見積もりがある場合は、その値に2を掛けたものを使用できます。

行の長さの可能な上限についてはまったくわからないので、明らかな解決策はファイルをループすることです。

for line in fh:
    pass
last = line

使用できるバイナリフラグを気にする必要はありませんopen(fname)

ETA:作業するファイルがたくさんあるので、を使用して数十のファイルのサンプルを作成し、random.sampleそれらに対してこのコードを実行して、最後の行の長さを決定できます。事前に大きな値の位置シフト(たとえば1 MB)を使用します。これは、フルランの値を見積もるのに役立ちます。

于 2010-07-27T18:06:46.480 に答える
25

これは、あなたが望むことを行う SilentGhost の回答の修正版です。

with open(fname, 'rb') as fh:
    first = next(fh)
    offs = -100
    while True:
        fh.seek(offs, 2)
        lines = fh.readlines()
        if len(lines)>1:
            last = lines[-1]
            break
        offs *= 2
    print first
    print last

ここでは行の長さの上限は必要ありません。

于 2010-07-27T18:39:57.667 に答える
10

UNIXコマンドを使用できますか?head -1とを使用するとtail -n 1、おそらく最も効率的な方法だと思います。または、単純なものを使用しfid.readline()て最初の行とを取得することもできますfid.readlines()[-1]が、メモリが多すぎる可能性があります。

于 2010-07-27T18:07:27.540 に答える
4

最初にファイルを読み取りモードで開きます。次に readlines() メソッドを使用して行ごとに読み取ります。リストに格納されているすべての行。これで、リスト スライスを使用してファイルの最初と最後の行を取得できます。

    a=open('file.txt','rb')
    lines = a.readlines()
    if lines:
        first_line = lines[:1]
        last_line = lines[-1]
于 2013-09-06T04:35:31.107 に答える
2

これは、1行しかないファイルのコーナーケースを処理するための追加のロジックを持つ@Traspの回答の拡張です。継続的に更新されているファイルの最後の行を繰り返し読みたい場合は、このケースを処理すると便利です。これがないと、作成されたばかりで 1 行しかないファイルの最後の行を取得しようとすると、エラーIOError: [Errno 22] Invalid argumentが発生します。

def tail(filepath):
    with open(filepath, "rb") as f:
        first = f.readline()      # Read the first line.
        f.seek(-2, 2)             # Jump to the second last byte.
        while f.read(1) != b"\n": # Until EOL is found...
            try:
                f.seek(-2, 1)     # ...jump back the read byte plus one more.
            except IOError:
                f.seek(-1, 1)
                if f.tell() == 0:
                    break
        last = f.readline()       # Read last line.
    return last
于 2017-01-05T17:48:56.040 に答える
1

最初の行を取得するのは簡単です。最後の行については、行の長さのおおよその上限がわかっていると仮定して、os.lseekSEEK_ENDは、最後から2番目の行の終わりを見つけてから、最後の行をreadline()します。

于 2010-07-27T18:08:31.260 に答える
1
with open(filename, "rb") as f:#Needs to be in binary mode for the seek from the end to work
    first = f.readline()
    if f.read(1) == '':
        return first
    f.seek(-2, 2)  # Jump to the second last byte.
    while f.read(1) != b"\n":  # Until EOL is found...
        f.seek(-2, 1)  # ...jump back the read byte plus one more.
    last = f.readline()  # Read last line.
    return last

上記の回答は、ファイルに1行しかない場合を処理する上記の回答の修正版です

于 2018-07-29T08:50:56.020 に答える