tnk_peka
tnk_peka

Reputation: 1535

How to detect that I am reading from a file when write is not completed?

We have a multithread program which does the following:

thread_1 is a listener of hard disk to detect a new file created. We use WatchService api in Java 7. When a new file is created by another program, thread_1 detects and get it and put it to a PriorityBlockingQueue ex:

priorityBlockingQueue.add(FileObject)

FileObjComparator is a custom object implement comparator. It is sorted by created time and fileCreatedTime field in FileObject I get from system time when detect this file:

 public int compare(FileObject o1, FileObject o2) {
        return o1.getFileCreatedTime().compareTo(o2.getFileCreatedTime());
    }

priorityBlockingQueue is initializes as:

DataFileQueue.priorityBlockingQueue = new PriorityBlockingQueue<FileObject>(100000, new FileObjComparator());

and Thread_2 will process this next to the last file in this priorityBlockingQueue

if(priorityBlockingQueue.size) > 1)
   process(priorityBlockingQueue.poll());

2 threads are running in parallel but when I process a number of large files, sometime Thread_2 process a file while it is being written. I detect this because recheck the content file and the result of processing.

This program is running on Centos 6.2, this hard disk partition is mounted in async mode. Thanks for any help.

Upvotes: 0

Views: 1308

Answers (4)

user207421
user207421

Reputation: 310883

Your Comparator should order by last modified time, not creation time. I don't see how you can know for example that two files opened in ordr A, B will be completely written in the same order, unless you positively know for a fact that file production is strictly sequential. You haven't said so.

Upvotes: 0

moskito-x
moskito-x

Reputation: 11958

EDIT a more detailed answer.

The problem is ...

You wrote:

It is sorted by created time and fileCreatedTime field in FileObject I get from system time when detect this file: ....

thread_1 is a listener of hard disk to detect a new file created. We use WatchService api in Java 7. When a new file is created by another program. ... thread_1 detects and get it and put it to a PriorityBlockingQueue ex:

  • The create time and the "file writing finished time", can be very different. (depending on the file size).

for example:

Open a File manager. Start downloading a about 60 mb File. Note the Create time. After about 3 minutes look at the final time.

to detect a new file, looking at the create time is the wrong moment to "put it to a PriorityBlockingQueue ex:"

thraed_1 have to wait until the file writing has finished. And then he can put it to "a PriorityBlockingQueue ex:"

How can I detect that the write is completed on a file ?

3 not too complicated options

  • a.) Compare the file is created and the file is ready time. or
  • b.) Observe that the size of the file is growing steadily. If the file is finished it stops growing. or
  • c.) Try to move it to a temp folder.

What would you prefer ?

I would prefer solution c.

A file opened for writing can not be moved. After the 3rd party program closes the file it can be moved.

The necessary steps.

  • thread_1 is watching for created files by a 3rd party program.
  • thread_1 trying to move it to a xyztmp folder ( every 10 or 20 or ... seconds).
  • thread_1 looking for new incoming files in the xyztmp folder and put it to a PriorityBlockingQueue ex.

solution b. is more complicated.

thread_1 put the incoming filenames and the size in a control array to compare 3-5 times.(every 5 seconds or more).

Array

(filenamexyz.dat, size1, size2, size3, ...).
(filenameabc.dat, size1, size2, size3, ...).
(filenamefgh.dat, size1, size2, size3, ...).
....

If a file identified by name every 5 comparative sizes are the same the 3rd party program has finished writing to this file.

Now it can be put to a PriorityBlockingQueue ex:

Let's look step by step

We assume thread_2 started when the list.size is 2 !

  • 3rd party program starts writing files one by one.
  • 3rd party program starts writing FILE_1.
  • thread_1 detects created FILE_1, put it in the list.
  • 3rd party program finished writing FILE_1.
  • 3rd party program starts writing FILE_2.
  • thread_1 detects created FILE_2, put it in the list.
  • if(priorityBlockingQueue.size) > 1) TRUE
  • thread_2 starts with reading and processing first file in the list FILE_1.

  • 3rd party program finished writing FILE_2.
  • 3rd party program starts writing FILE_3.
  • thread_1 detects created FILE_3, put it in the list.
  • thread_2 finshed processing FILE_1.
  • thread_2 starts with next file in the list FILE_2.

  • 3rd party program finished writing FILE_3.
  • 3rd party program starts writing FILE_4.
  • thread_1 detects created FILE_4, put it in the list.
  • thread_2 finshed processing FILE_2.
  • thread_2 starts with next file in the list FILE_3.

    NOW THE TROUBLE STARTS


  • 3rd party program finished writing FILE_4.
  • 3rd party program starts writing FILE_5. (FILE_5 Larger then FILE_4).
  • thread_1 detects created FILE_5, put it in the list.
  • thread_2 finshed processing FILE_3.
  • thread_2 starts with next file in the list FILE_4.
  • thread_2 finshed processing FILE_4.
  • thread_2 starts with next file in the list FILE_5.
  • thread_2 finshed processing FILE_5.
  • 3rd party program finished writing FILE_5.

If the file that the 3rd party program writes is larger and needs more time to write and thread_2 has finished reading the smaller FILE_4 .

thread_2 takes the next file out of the list - FILE_5, whether the file is ready to read or not.

FILE_5 is the file 3rd party program still writes. FILE_5 is the file thread_2 is reading and processing. The bytes thread_2 reads are only the bytes 3rd party program has written at this time.

Upvotes: -2

Gray
Gray

Reputation: 116878

If you really are processing the 2nd to last file then I'm surprised that the size of it is growing unless multiple processes or threads are generating the input files. Make sure that the other process that is creating the files flush and close each file before writing the next one.

  • You could read the file in blocks and then go back over a period of time to see if any additional data was added to the file and process it at that time using a RandomAccessFile. If you are reading a file line by line you would need to do your own pagination unfortunately. If the file is line based then you should make sure that the line termination characters close the file.

  • Another thing you can try is to delay the processing of the file a bit to let the file system flush its buffers. Ugly and unreliable but maybe necessary.

  • If you can adjust the output process then you could end the file with a magic string and then not process the file until the magic string is seen.

  • You could have the process the writes the file, write the size of the file into a separate file with a ".size" extension (or something). The size file would help you verify that you are reading the correct number of characters.

  • Another thing to try is to Runtime.exec("/bin/sync"); before you start reading from a file to synchronize the file system if you are running on ~unix system. The problem is that support for this is highly OS dependent. It also can be a real performance killer. He's the man page on my Mac:

    The sync utility can be called to ensure that all disk writes have been completed

Upvotes: 2

Nullbeans
Nullbeans

Reputation: 310

You can try using semaphores to organize access to each file, so as no file will get written onto by more than one thread at a time. I think each file object should have its own semaphore, and each thread should try to acquire the semaphore before writing to the file.

Upvotes: 1

Related Questions