Reputation: 1535
We have a multithread program which does the following:
thread_1
is a listener of hard disk to detect a new file created. We use WatchService
api in Java 7. When a new file is created by another program, thread_1
detects and get it and put it to a PriorityBlockingQueue
ex:
priorityBlockingQueue.add(FileObject)
FileObjComparator
is a custom object implement comparator. It is sorted by created time and fileCreatedTime
field in FileObject
I get from system time when detect this file:
public int compare(FileObject o1, FileObject o2) {
return o1.getFileCreatedTime().compareTo(o2.getFileCreatedTime());
}
priorityBlockingQueue
is initializes as:
DataFileQueue.priorityBlockingQueue = new PriorityBlockingQueue<FileObject>(100000, new FileObjComparator());
and Thread_2
will process this next to the last file in this priorityBlockingQueue
if(priorityBlockingQueue.size) > 1)
process(priorityBlockingQueue.poll());
2 threads are running in parallel but when I process a number of large files, sometime Thread_2
process a file while it is being written. I detect this because recheck the content file and the result of processing.
This program is running on Centos 6.2, this hard disk partition is mounted in async mode. Thanks for any help.
Upvotes: 0
Views: 1308
Reputation: 310883
Your Comparator should order by last modified time, not creation time. I don't see how you can know for example that two files opened in ordr A, B will be completely written in the same order, unless you positively know for a fact that file production is strictly sequential. You haven't said so.
Upvotes: 0
Reputation: 11958
EDIT a more detailed answer.
The problem is ...
You wrote:
It is sorted by created time and fileCreatedTime field in FileObject I get from system time when detect this file: ....
thread_1 is a listener of hard disk to detect a new file created. We use WatchService api in Java 7. When a new file is created by another program. ... thread_1 detects and get it and put it to a PriorityBlockingQueue ex:
for example:
Open a File manager. Start downloading a about 60 mb File. Note the Create time. After about 3 minutes look at the final time.
to detect a new file, looking at the create time is the wrong moment to "put it to a PriorityBlockingQueue ex:"
thraed_1 have to wait until the file writing has finished. And then he can put it to "a PriorityBlockingQueue ex:"
How can I detect that the write is completed on a file ?
3 not too complicated options
What would you prefer ?
I would prefer solution c.
A file opened for writing can not be moved. After the 3rd party program closes the file it can be moved.
The necessary steps.
solution b. is more complicated.
thread_1 put the incoming filenames and the size in a control array to compare 3-5 times.(every 5 seconds or more).
Array
(filenamexyz.dat, size1, size2, size3, ...).
(filenameabc.dat, size1, size2, size3, ...).
(filenamefgh.dat, size1, size2, size3, ...).
....
If a file identified by name every 5 comparative sizes are the same the 3rd party program has finished writing to this file.
Now it can be put to a PriorityBlockingQueue ex:
Let's look step by step
We assume thread_2 started when the list.size is 2 !
thread_2 starts with next file in the list FILE_3.
NOW THE TROUBLE STARTS
If the file that the 3rd party program writes is larger and needs more time to write and thread_2 has finished reading the smaller FILE_4 .
thread_2 takes the next file out of the list - FILE_5, whether the file is ready to read or not.
FILE_5 is the file 3rd party program still writes. FILE_5 is the file thread_2 is reading and processing. The bytes thread_2 reads are only the bytes 3rd party program has written at this time.
Upvotes: -2
Reputation: 116878
If you really are processing the 2nd to last file then I'm surprised that the size of it is growing unless multiple processes or threads are generating the input files. Make sure that the other process that is creating the files flush and close each file before writing the next one.
You could read the file in blocks and then go back over a period of time to see if any additional data was added to the file and process it at that time using a RandomAccessFile
. If you are reading a file line by line you would need to do your own pagination unfortunately. If the file is line based then you should make sure that the line termination characters close the file.
Another thing you can try is to delay the processing of the file a bit to let the file system flush its buffers. Ugly and unreliable but maybe necessary.
If you can adjust the output process then you could end the file with a magic string and then not process the file until the magic string is seen.
You could have the process the writes the file, write the size of the file into a separate file with a ".size" extension (or something). The size file would help you verify that you are reading the correct number of characters.
Another thing to try is to Runtime.exec("/bin/sync");
before you start reading from a file to synchronize the file system if you are running on ~unix system. The problem is that support for this is highly OS dependent. It also can be a real performance killer. He's the man page on my Mac:
The sync utility can be called to ensure that all disk writes have been completed
Upvotes: 2
Reputation: 310
You can try using semaphores to organize access to each file, so as no file will get written onto by more than one thread at a time. I think each file object should have its own semaphore, and each thread should try to acquire the semaphore before writing to the file.
Upvotes: 1