SriK
SriK

Reputation: 21

cant understand python iterator behavior on file object

I am having a problem in understanding a recent phenomenon that broke our code in production. This piece of code

  1. reads a csv file from S3 bucket
  2. checks if there is data in file (other than header)
  3. If there is data, iterates over the file and does some processing.

        with self.s3.get_stream(bucket=self.import_bucket, key_name=self.in_file, mode="rb") as file_obj:
            # Check file for any amount data, return false if there is no Data
            if not self.check_file_for_data(file_obj):
                return False
            my_producer = iter(self.line_producer(file_obj))
            self.header = next(my_producer)

def check_file_for_data(self, file_obj):
    try:
        next(islice(file_obj, 1, 2))
    except StopIteration:
        # File has no Data other than column names.
        return False
    except Exception as e:
        # unknown problem caused in reading file.
        self.log.error("Error in reading file: {0} for post processing. error message: {1}".format(self.in_file, e))
    else:
        # File has readable data other than Column names.
        return True

def line_producer(self, file_obj):
    # Added recently to solve this problem, but was working without this, till dec
    file_obj.seek(0)
    self.log.info("LINE PRODUCTION : Started")
    csv_reader_obj = csv.reader(file_obj)
    header = next(csv_reader_obj)
    yield header
    for index, row in enumerate(csv_reader_obj, 1):
        while self.topic_queue.qsize() > 20000:
            pass
        packet = ([index, row, header])
        try:
            # Block at most for 500 sec till an Empty slot is found
            self.topic_queue.put(obj=packet, block=True, timeout= 500)
        except Exception as e:
            self.log.info("LINE PRODUCTION : FAILED")
            type_, value_, traceback_ = sys.exc_info()
            self.log.exception("traceback :{} ||type: {} ||value: {}".format(traceback.extract_tb(traceback_),
                                                                             type_,
                                                                             value_))
            raise ValueError("PROBLEM IN LOADING PACKET TO TASK QUEUE. "
                             "\n  \t PACKET -> {} \n  \t INDEX -> {}".format(packet, index))
        if index % 100000 == 0:
            self.log.info("produced -> {}".format(index))
    self.log.info("LINE PRODUCTION : FINISHED")
    yield index

This is working well until December. But started to break recently. After debuggin, what I have understood was.

self.header = next(my_producer)

this line is returning 2nd line in the file instead of Header. I have to reset the object to get the header like this.

    file_obj.seek(0)

can any of you please let me know if there is something that I doing wrong here. Also why only break now??

Upvotes: 0

Views: 72

Answers (1)

dorian
dorian

Reputation: 6282

To me it seems that check_file_for_data always consumes two lines from the file when you do

next(islice(file_obj, 1, 2))

One line is skipped and the next returned.

I don't really see how your snippet could have ever worked without seeking back to the start of the file in line_producer.

Upvotes: 1

Related Questions