Will Munn
Will Munn

Reputation: 7967

how to optimise reading multiple files in python

I have a program in python 2.7 that needs to read all the files in a directory and aggregate their contents. Right now, I'm doing this in a single thread one after the other like so:

def read_file(path):
with open(path, 'r') as f:
    return f.read()
files = map(read_file, paths)

I would however like to optimise this so that I don't have to wait for each file to be read before reading the next. I've been searching on how to do this in parallel and came up with several solutions including using multiprocessing, Thread and Queue, the fastest of these being the following:

from threading import Thread
import Queue

def add_to_queue(q, f):
    q.put(read_file(f))
q = Queue.Queue()
files = []
for f in paths:
    t = Thread(target=add_to_queue, args = (q, f))
    t.daemon = True
    t.start()
for f in paths:
    files.append(q.get())

However, after tying many options, reading the files on a single thread one after the other seems like the fastest way of doing this. Am I missing something here? What's the most efficient way to do this?

Upvotes: 1

Views: 955

Answers (3)

preston datum
preston datum

Reputation: 1

Use a thread to populate a data lake in a DB. Refresh and query with break points that log key EDA. Speeding up the processing and aggregating the files as a whole may make cleaning more time consuming. I just did this with BSON files with FP-growth algo.

class FPTreeNode():
    def __init__(self, item=None, support=1):
        # 'Value' of the item
        self.item = item
        # Number of times the item occurs in a
        # transaction
        self.support = support
        # Child nodes in the FP Growth Tree
        self.children = {}


class FPGrowth():
    def __init__(self, min_sup=0.3):
        self.min_sup = min_sup
        # The root of the initial FP Growth Tree
        self.tree_root = None
        # Prefixes of itemsets in the FP Growth Tree
        self.prefixes = {}
        self.frequent_itemsets = []

    # Count the number of transactions that contains item.
    def _calculate_support(self, item, transactions):
        count = 0
        for transaction in transactions:
            if item in transaction:
                count += 1
        support = count
        return support

    # Returns a set of frequent items. An item is determined to
    # be frequent if there are atleast min_sup transactions that 
    contains
    # it.
    def _get_frequent_items(self, transactions):
        # Get all unique items in the transactions
        unique_items = set(
        item for transaction in transactions for item in transaction)
        items = []
        for item in unique_items:
            sup = self._calculate_support(item, transactions)
            if sup >= self.min_sup:
                items.append([item, sup])
        # Sort by support - Highest to lowest
        items.sort(key=lambda item: item[1], reverse=True)
        frequent_items = [[el[0]] for el in items]
        # Only return the items
        return frequent_items

    # Recursive method which adds nodes to the tree.
    def _insert_tree(self, node, children):
        if not children:
            return
        # Create new node as the first item in children list
        child_item = children[0]
        child = FPTreeNode(item=child_item)
        # If parent already contains item => increase the support
        if child_item in node.children:
            node.children[child.item].support += 1
        else:
            node.children[child.item] = child

        # Execute _insert_tree on the rest of the children list
        # from the new node
        self._insert_tree(node.children[child.item], children[1:])

    def _construct_tree(self, transactions, frequent_items=None):
        if not frequent_items:
            # Get frequent items sorted by support
            frequent_items = self._get_frequent_items(transactions)
        unique_frequent_items = list(
        set(item for itemset in frequent_items for item in itemset))
        # Construct the root of the FP Growth tree
        root = FPTreeNode()
        for transaction in transactions:
            # Remove items that are not frequent according to
            # unique_frequent_items
            transaction = [item for item in transaction 
            if item in unique_frequent_items] 
            transaction.sort(key=lambda item:     
                frequent_items.index([item]))
                self._insert_tree(root, transaction)

        return root

    # Recursive method which prints the FP Growth Tree
    def print_tree(self, node=None, indent_times=0):
        if not node:
            node = self.tree_root
        indent = "    " * indent_times
        print ("%s%s:%s" % (indent, node.item, node.support))
        for child_key in node.children:
            child = node.children[child_key]
            self.print_tree(child, indent_times + 1)

    # Makes sure that the first item in itemset
    # is a child of node and that every following item
    # in itemset is reachable via that path
    def _is_prefix(self, itemset, node):
        for item in itemset:
            if not item in node.children:
                return False
            node = node.children[item]
        return True

    # Recursive method that adds prefixes to the itemset by
    # traversing the FP Growth Tree
    def _determine_prefixes(self, itemset, node, prefixes=None):
        if not prefixes:
            prefixes = []

        # If the current node is a prefix to the itemset
        # add the current prefixes value as prefix to the itemset
        if self._is_prefix(itemset, node):
            itemset_key = self._get_itemset_key(itemset)
            if not itemset_key in self.prefixes:
                self.prefixes[itemset_key] = []
            self.prefixes[itemset_key] += [{"prefix": prefixes,     
            "support": node.children[itemset[0]].support}]

        for child_key in node.children:
            child = node.children[child_key]
            # Recursive call with child as new node. Add the child 
            #item as potential
            # prefix.
            self._determine_prefixes(itemset, child, prefixes + 
            [child.item])

    # Determines the look of the hashmap key for self.prefixes
    # List of more strings than one gets joined by '-'
    def _get_itemset_key(self, itemset):
        if len(itemset) > 1:
            itemset_key = "-".join(itemset)
        else:
            itemset_key = str(itemset[0])
        return itemset_key

    def _determine_frequent_itemsets(self, conditional_database, 
    suffix):
    # Calculate new frequent items from the conditional database
    # of suffix
        frequent_items = 
        self._get_frequent_items(conditional_database)

        cond_tree = None

        if suffix:
            cond_tree = self._construct_tree(conditional_database, 
            frequent_items)
            # Output new frequent itemset as the suffix added to the 
            # frequent
            # items
            self.frequent_itemsets += [el + suffix for el in 
            frequent_items]

        # Find larger frequent itemset by finding prefixes
        # of the frequent items in the FP Growth Tree for the # 
        # conditional
        # database.
        self.prefixes = {}
        for itemset in frequent_items:
            # If no suffix (first run)
            if not cond_tree:
                cond_tree = self.tree_root
            # Determine prefixes to itemset
            self._determine_prefixes(itemset, cond_tree)
            conditional_database = []
            itemset_key = self._get_itemset_key(itemset)
            # Build new conditional database
            if itemset_key in self.prefixes:
                for el in self.prefixes[itemset_key]:
                    # If support = 4 => add 4 of the corresponding 
                    # prefix set
                    for _ in range(el["support"]):
                        conditional_database.append(el["prefix"])
                # Create new suffix
                new_suffix = itemset + suffix if suffix else itemset
                self._determine_frequent_itemsets(conditional_database, suffix=new_suffix)

    def find_frequent_itemsets(self, transactions, suffix=None,     
    show_tree=False):
        self.transactions = transactions

        # Build the FP Growth Tree
        self.tree_root = self._construct_tree(transactions)
        if show_tree:
            print ("FP-Growth Tree:")
            self.print_tree(self.tree_root)

        self._determine_frequent_itemsets(transactions, suffix=None)

        return self.frequent_itemsets

def main():

Upvotes: 0

aghast
aghast

Reputation: 15320

Assuming that you really are reading multiple files from a single disk, your operation is going to be I/O bound rather than CPU bound.

No amount of multiprocessing, queueing, multithreading, bending, folding, twisting, hoop-jumping, or other gimcrackery will make the disk spindle turn faster, or the heads move more swiftly across the cylinders.

In order to get more performance from this, either look into increasing I/O performance, or look at a different approach to your solution. (Could you redesign the architecture in such a way that different pieces were served by different servers, or there were fewer pieces, or ...?)

Depending on the sizes and number of the files, you might consider: using an SSD drive, using multiple drives, using a RAID controller, using a SAN, or using a server cluster.

Upvotes: 1

brc
brc

Reputation: 269

Reading from disk is a sequential operation, it won’t matter how many processes you have, basically only one of them will read at a time. Concurrency will be useful if you need to perform other operations while reading files, like searching for specific string or regex file content of already loaded files.

Upvotes: 0

Related Questions