Brana
Brana

Reputation: 1239

Processing Large Files in Python [ 1000 GB or More]

Lets say i have a text file of 1000 GB. I need to find how much times a phrase occurs in the text.

Is there any faster way to do this that the one i am using bellow? How much would it take to complete the task.

phrase = "how fast it is"
count = 0
with open('bigfile.txt') as f:
    for line in f:
        count += line.count(phrase)

If I am right if I do not have this file in the memory i would meed to wait till the PC loads the file each time I am doing the search and this should take at least 4000 sec for a 250 MB/sec hard drive and a file of 10000 GB.

Upvotes: 18

Views: 19710

Answers (8)

Lie Ryan
Lie Ryan

Reputation: 64827

Had you considered indexing your file? The way search engine works is by creating a mapping from words to the location they are in the file. Say if you have this file:

Foo bar baz dar. Dar bar haa.

You create an index that looks like this:

{
    "foo": {0},
    "bar": {4, 21},
    "baz": {8},
    "dar": {12, 17},
    "haa": {25},
}

A hashtable index can be looked up in O(1); so it's freaking fast.

And someone searches for the query "bar baz" you first break the query into its constituent words: ["bar", "baz"] and you then found {4, 21}, {8}; then you use this to jump out right to the places where the queried text could possible exists.

There are out of the box solutions for indexed search engines as well; for example Solr or ElasticSearch.

Upvotes: 4

woot
woot

Reputation: 7606

Here is a third, longer method that uses a database. The database is sure to be larger than the text. I am not sure about if the indexes is optimal, and some space savings could come from playing with that a little. (like, maybe WORD, and POS, WORD are better, or perhaps WORD, POS is just fine, need to experiment a little).

This may not perform well on 200 OK's test though because it is a lot of repeating text, but might perform well on more unique data.

First create a database by scanning the words, etc:

import sqlite3
import re

INPUT_FILENAME = 'bigfile.txt'
DB_NAME = 'words.db'
FLUSH_X_WORDS=10000


conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()


cursor.execute("""
CREATE TABLE IF NOT EXISTS WORDS (
     POS INTEGER
    ,WORD TEXT
    ,PRIMARY KEY( POS, WORD )
) WITHOUT ROWID
""")

cursor.execute("""
DROP INDEX IF EXISTS I_WORDS_WORD_POS
""")

cursor.execute("""
DROP INDEX IF EXISTS I_WORDS_POS_WORD
""")


cursor.execute("""
DELETE FROM WORDS
""")

conn.commit()

def flush_words(words):
    for word in words.keys():
        for pos in words[word]:
            cursor.execute('INSERT INTO WORDS (POS, WORD) VALUES( ?, ? )', (pos, word.lower()) )

    conn.commit()

words = dict()
pos = 0
recomp = re.compile('\w+')
with open(INPUT_FILENAME, 'r') as f:
    for line in f:

        for word in [x.lower() for x in recomp.findall(line) if x]:
            pos += 1
            if words.has_key(word):
                words[word].append(pos)
            else:
                words[word] = [pos]
        if pos % FLUSH_X_WORDS == 0:
            flush_words(words)
            words = dict()
    if len(words) > 0:
        flush_words(words)
        words = dict()


cursor.execute("""
CREATE UNIQUE INDEX I_WORDS_WORD_POS ON WORDS ( WORD, POS )
""")

cursor.execute("""
CREATE UNIQUE INDEX I_WORDS_POS_WORD ON WORDS ( POS, WORD )
""")

cursor.execute("""
VACUUM
""")

cursor.execute("""
ANALYZE WORDS
""")

Then search the database by generating SQL:

import sqlite3
import re

SEARCH_PHRASE = 'how fast it is'
DB_NAME = 'words.db'


conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()

recomp = re.compile('\w+')

search_list = [x.lower() for x in recomp.findall(SEARCH_PHRASE) if x]

from_clause = 'FROM\n'
where_clause = 'WHERE\n'
num = 0
fsep = '     '
wsep = '     '
for word in search_list:
    num += 1
    from_clause += '{fsep}words w{num}\n'.format(fsep=fsep,num=num)
    where_clause += "{wsep} w{num}.word = '{word}'\n".format(wsep=wsep, num=num, word=word)
    if num > 1:
        where_clause += "  AND w{num}.pos = w{lastnum}.pos + 1\n".format(num=str(num),lastnum=str(num-1))

    fsep = '    ,'
    wsep = '  AND'


sql = """{select}{fromc}{where}""".format(select='SELECT COUNT(*)\n',fromc=from_clause, where=where_clause)

res = cursor.execute( sql )

print res.fetchone()[0] 

Upvotes: 2

Yann Vernier
Yann Vernier

Reputation: 15877

We're talking about a simple count of a specific substring within a rather large data stream. The task is nearly certainly I/O bound, but very easily parallelised. The first layer is the raw read speed; we can choose to reduce the read amount by using compression, or distribute the transfer rate by storing the data in multiple places. Then we have the search itself; substring searches are a well known problem, again I/O limited. If the data set comes from a single disk pretty much any optimisation is moot, as there's no way that disk beats a single core in speed.

Assuming we do have chunks, which might for instance be the separate blocks of a bzip2 file (if we use a threaded decompressor), stripes in a RAID, or distributed nodes, we have much to gain from processing them individually. Each chunk is searched for needle, then joints can be formed by taking len(needle)-1 from the end of one chunk and beginning of the next, and searching within those.

A quick benchmark demonstrates that the regular expression state machines operate faster than the usual in operator:

>>> timeit.timeit("x.search(s)", "s='a'*500000; import re; x=re.compile('foobar')", number=20000)
17.146117210388184
>>> timeit.timeit("'foobar' in s", "s='a'*500000", number=20000)
24.263535976409912
>>> timeit.timeit("n in s", "s='a'*500000; n='foobar'", number=20000)
21.562405109405518

Another step of optimization we can perform, given that we have the data in a file, is to mmap it instead of using the usual read operations. This permits the operating system to use the disk buffers directly. It also allows the kernel to satisfy multiple read requests in arbitrary order without making extra system calls, which lets us exploit things like an underlying RAID when operating in multiple threads.

Here's a quickly tossed together prototype. A few things could obviously be improved, such as distributing the chunk processes if we have a multinode cluster, doing the tail+head check by passing one to the neighboring worker (an order which is not known in this implementation) instead of sending both to a special worker, and implementing an interthread limited queue (pipe) class instead of matching semaphores. It would probably also make sense to move the worker threads outside of the main thread function, since the main thread keeps altering its locals.

from mmap import mmap, ALLOCATIONGRANULARITY, ACCESS_READ
from re import compile, escape
from threading import Semaphore, Thread
from collections import deque

def search(needle, filename):
    # Might want chunksize=RAID block size, threads
    chunksize=ALLOCATIONGRANULARITY*1024
    threads=32
    # Read chunk allowance
    allocchunks=Semaphore(threads)  # should maybe be larger
    chunkqueue=deque()   # Chunks mapped, read by workers
    chunksready=Semaphore(0)
    headtails=Semaphore(0)   # edges between chunks into special worker
    headtailq=deque()
    sumq=deque()     # worker final results

    # Note: although we do push and pop at differing ends of the
    # queues, we do not actually need to preserve ordering. 

    def headtailthread():
        # Since head+tail is 2*len(needle)-2 long, 
        # it cannot contain more than one needle
        htsum=0
        matcher=compile(escape(needle))
        heads={}
        tails={}
        while True:
            headtails.acquire()
            try:
                pos,head,tail=headtailq.popleft()
            except IndexError:
                break  # semaphore signaled without data, end of stream
            try:
                prevtail=tails.pop(pos-chunksize)
                if matcher.search(prevtail+head):
                    htsum+=1
            except KeyError:
                heads[pos]=head
            try:
                nexthead=heads.pop(pos+chunksize)
                if matcher.search(tail+nexthead):
                    htsum+=1
            except KeyError:
                tails[pos]=tail
        # No need to check spill tail and head as they are shorter than needle
        sumq.append(htsum)

    def chunkthread():
        threadsum=0
        # escape special characters to achieve fixed string search
        matcher=compile(escape(needle))
        borderlen=len(needle)-1
        while True:
            chunksready.acquire()
            try:
                pos,chunk=chunkqueue.popleft()
            except IndexError:   # End of stream
                break
            # Let the re module do the heavy lifting
            threadsum+=len(matcher.findall(chunk))
            if borderlen>0:
                # Extract the end pieces for checking borders
                head=chunk[:borderlen]
                tail=chunk[-borderlen:]
                headtailq.append((pos,head,tail))
                headtails.release()
            chunk.close()
            allocchunks.release()  # let main thread allocate another chunk
        sumq.append(threadsum)

    with infile=open(filename,'rb'):
        htt=Thread(target=headtailthread)
        htt.start()
        chunkthreads=[]
        for i in range(threads):
            t=Thread(target=chunkthread)
            t.start()
            chunkthreads.append(t)
        pos=0
        fileno=infile.fileno()
        while True:
            allocchunks.acquire()
            chunk=mmap(fileno, chunksize, access=ACCESS_READ, offset=pos)
            chunkqueue.append((pos,chunk))
            chunksready.release()
            pos+=chunksize
            if pos>chunk.size():   # Last chunk of file?
                break
        # File ended, finish all chunks
        for t in chunkthreads:
            chunksready.release()   # wake thread so it finishes
        for t in chunkthreads:
            t.join()    # wait for thread to finish
        headtails.release()     # post event to finish border checker
        htt.join()
        # All threads finished, collect our sum
        return sum(sumq)

if __name__=="__main__":
    from sys import argv
    print "Found string %d times"%search(*argv[1:])

Also, modifying the whole thing to use some mapreduce routine (map chunks to counts, heads and tails, reduce by summing counts and checking tail+head parts) is left as an exercise.

Edit: Since it seems this search will be repeated with varying needles, an index would be much faster, being able to skip searches of sections that are known not to match. One possibility is making a map of which blocks contain any occurence of various n-grams (accounting for the block borders by allowing the ngram to overlap into the next); those maps can then be combined to find more complex conditions, before the blocks of original data need to be loaded. There are certainly databases to do this; look for full text search engines.

Upvotes: 2

Ashwini Chaudhary
Ashwini Chaudhary

Reputation: 250881

I used file.read() to read the data in chunks, in current examples the chunks were of size 100 MB, 500MB, 1GB and 2GB respectively. The size of my text file is 2.1 GB.

Code:

 from functools import partial

 def read_in_chunks(size_in_bytes):

    s = 'Lets say i have a text file of 1000 GB'
    with open('data.txt', 'r+b') as f:
        prev = ''
        count = 0
        f_read  = partial(f.read, size_in_bytes)
        for text in iter(f_read, ''):
            if not text.endswith('\n'):
                # if file contains a partial line at the end, then don't
                # use it when counting the substring count. 
                text, rest = text.rsplit('\n', 1)
                # pre-pend the previous partial line if any.
                text =  prev + text
                prev = rest
            else:
                # if the text ends with a '\n' then simple pre-pend the
                # previous partial line. 
                text =  prev + text
                prev = ''
            count += text.count(s)
        count += prev.count(s)
        print count

Timings:

read_in_chunks(104857600)
$ time python so.py
10000000

real    0m1.649s
user    0m0.977s
sys     0m0.669s

read_in_chunks(524288000)
$ time python so.py
10000000

real    0m1.558s
user    0m0.893s
sys     0m0.646s

read_in_chunks(1073741824)
$ time python so.py
10000000

real    0m1.242s
user    0m0.689s
sys     0m0.549s


read_in_chunks(2147483648)
$ time python so.py
10000000

real    0m0.844s
user    0m0.415s
sys     0m0.408s

On the other hand the simple loop version takes around 6 seconds on my system:

def simple_loop():

    s = 'Lets say i have a text file of 1000 GB'
    with open('data.txt') as f:
        print sum(line.count(s) for line in f)

$ time python so.py
10000000

real    0m5.993s
user    0m5.679s
sys     0m0.313s

Results of @SlaterTyranus's grep version on my file:

$ time grep -o 'Lets say i have a text file of 1000 GB' data.txt|wc -l
10000000

real    0m11.975s
user    0m11.779s
sys     0m0.568s

Results of @woot's solution:

$ time cat data.txt | parallel --block 10M --pipe grep -o 'Lets\ say\ i\ have\ a\ text\ file\ of\ 1000\ GB' | wc -l
10000000

real    0m5.955s
user    0m14.825s
sys     0m5.766s

Got best timing when I used 100 MB as block size:

$ time cat data.txt | parallel --block 100M --pipe grep -o 'Lets\ say\ i\ have\ a\ text\ file\ of\ 1000\ GB' | wc -l
10000000

real    0m4.632s
user    0m13.466s
sys     0m3.290s

Results of woot's second solution:

$ time python woot_thread.py # CHUNK_SIZE = 1073741824
10000000

real    0m1.006s
user    0m0.509s
sys     0m2.171s
$ time python woot_thread.py  #CHUNK_SIZE = 2147483648
10000000

real    0m1.009s
user    0m0.495s
sys     0m2.144s

System Specs: Core i5-4670, 7200 RPM HDD

Upvotes: 29

woot
woot

Reputation: 7606

Here is a Python attempt... You might need to play with the THREADS and CHUNK_SIZE. Also it's a bunch of code in a short time so I might not have thought of everything. I do overlap my buffer though to catch the ones in between, and I extend the last chunk to include the remainder of the file.

import os
import threading

INPUTFILE ='bigfile.txt'
SEARCH_STRING='how fast it is'
THREADS = 8  # Set to 2 times number of cores, assuming hyperthreading
CHUNK_SIZE = 32768

FILESIZE = os.path.getsize(INPUTFILE)
SLICE_SIZE = FILESIZE / THREADS



class myThread (threading.Thread):
    def __init__(self, filehandle, seekspot):
        threading.Thread.__init__(self)
        self.filehandle = filehandle
        self.seekspot = seekspot
        self.cnt = 0
    def run(self):
        self.filehandle.seek( self.seekspot )

        p = self.seekspot
        if FILESIZE - self.seekspot < 2 * SLICE_SIZE:
            readend = FILESIZE
        else: 
            readend = self.seekspot + SLICE_SIZE + len(SEARCH_STRING) - 1
        overlap = ''
        while p < readend:
            if readend - p < CHUNK_SIZE:
                buffer = overlap + self.filehandle.read(readend - p)
            else:
                buffer = overlap + self.filehandle.read(CHUNK_SIZE)
            if buffer:
                self.cnt += buffer.count(SEARCH_STRING)
            overlap = buffer[len(buffer)-len(SEARCH_STRING)+1:]
            p += CHUNK_SIZE

filehandles = []
threads = []
for fh_idx in range(0,THREADS):
    filehandles.append(open(INPUTFILE,'rb'))
    seekspot = fh_idx * SLICE_SIZE
    threads.append(myThread(filehandles[fh_idx],seekspot ) )
    threads[fh_idx].start()

totalcount = 0 
for fh_idx in range(0,THREADS):
    threads[fh_idx].join()
    totalcount += threads[fh_idx].cnt

print totalcount

Upvotes: 8

woot
woot

Reputation: 7606

Have you looked at using parallel / grep?

cat bigfile.txt | parallel --block 10M --pipe grep -o 'how\ fast\ it\ is' | wc -l

Upvotes: 7

Dan Hogan
Dan Hogan

Reputation: 2442

I concede that grep will be be faster. I assume this file is a large string based file.

But you could do something like this if you really really wanted.

import os
import re
import mmap

fileName = 'bigfile.txt'
phrase = re.compile("how fast it is")

with open(fileName, 'r') as fHandle:
    data = mmap.mmap(fHandle.fileno(), os.path.getsize(fileName), access=mmap.ACCESS_READ)
    matches = re.match(phrase, data)
    print('matches = {0}'.format(matches.group()))

Upvotes: 1

Slater Victoroff
Slater Victoroff

Reputation: 21914

Going to suggest doing this with grep instead of python. Will be faster, and generally if you're dealing with 1000GB of text on your local machine you've done something wrong, but all judgements aside, grep comes with a couple of options that will make your life easier.

grep -o '<your_phrase>' bigfile.txt|wc -l

Specifically this will count the number of lines in which your desired phrase appears. This should also count multiple occurrences on a single line.

If you don't need that you could instead do something like this:

grep -c '<your_phrase>' bigfile.txt

Upvotes: 2

Related Questions