Reputation: 21
I am having a problem in understanding a recent phenomenon that broke our code in production. This piece of code
with self.s3.get_stream(bucket=self.import_bucket, key_name=self.in_file, mode="rb") as file_obj:
# Check file for any amount data, return false if there is no Data
if not self.check_file_for_data(file_obj):
return False
my_producer = iter(self.line_producer(file_obj))
self.header = next(my_producer)
def check_file_for_data(self, file_obj):
try:
next(islice(file_obj, 1, 2))
except StopIteration:
# File has no Data other than column names.
return False
except Exception as e:
# unknown problem caused in reading file.
self.log.error("Error in reading file: {0} for post processing. error message: {1}".format(self.in_file, e))
else:
# File has readable data other than Column names.
return True
def line_producer(self, file_obj):
# Added recently to solve this problem, but was working without this, till dec
file_obj.seek(0)
self.log.info("LINE PRODUCTION : Started")
csv_reader_obj = csv.reader(file_obj)
header = next(csv_reader_obj)
yield header
for index, row in enumerate(csv_reader_obj, 1):
while self.topic_queue.qsize() > 20000:
pass
packet = ([index, row, header])
try:
# Block at most for 500 sec till an Empty slot is found
self.topic_queue.put(obj=packet, block=True, timeout= 500)
except Exception as e:
self.log.info("LINE PRODUCTION : FAILED")
type_, value_, traceback_ = sys.exc_info()
self.log.exception("traceback :{} ||type: {} ||value: {}".format(traceback.extract_tb(traceback_),
type_,
value_))
raise ValueError("PROBLEM IN LOADING PACKET TO TASK QUEUE. "
"\n \t PACKET -> {} \n \t INDEX -> {}".format(packet, index))
if index % 100000 == 0:
self.log.info("produced -> {}".format(index))
self.log.info("LINE PRODUCTION : FINISHED")
yield index
This is working well until December. But started to break recently. After debuggin, what I have understood was.
self.header = next(my_producer)
this line is returning 2nd line in the file instead of Header. I have to reset the object to get the header like this.
file_obj.seek(0)
can any of you please let me know if there is something that I doing wrong here. Also why only break now??
Upvotes: 0
Views: 72
Reputation: 6282
To me it seems that check_file_for_data
always consumes two lines from the file when you do
next(islice(file_obj, 1, 2))
One line is skipped and the next returned.
I don't really see how your snippet could have ever worked without seeking back to the start of the file in line_producer
.
Upvotes: 1