Reputation: 2156
I am writing a Web Server and am trying to make sure I am as efficient as possible, minimizing File System Calls. The problem is that the methods that return Streams such as java.nio.file.Files.list return a Stream of Paths
, and I would like to have a Stream of BasicFileAttributes, so that I can return the creation time and update time for each Path (on say returning results for an LDP Container).
Of course a simple solution would be to map
each element of the Stream with a function that takes the path and returns a file attribute (p: Path) => Files.getAttributeView...
but that sounds like it would make a call to the FS for each Path, which seems like a waste, because to get the file information the JDK can't have been far from the Attribute info.
I actually came across this mail from 2009 OpenJDK mailing list that states that they had discussed adding an API that would return a pair of a Path and Attributes...
I found a non-public class on the JDK java.nio.file.FileTreeWalker
which has an api that would allow one to fetch the attributes FileTreeWalker.Event
. That actually makes use of a sun.nio.fs.BasicFileAttributesHolder
which allows a Path to keep a cache of the Attributes. But it's not public and it is not clear where it works.
There is of course also the whole FileVisitor API, and that has methods that return both a Path
and BasicFileAttributes
as shown here:
public FileVisitResult visitFile(Path file, BasicFileAttributes attr) {...}
So I am looking if there is a way to turn that into a Stream which respects the principle of back pressure from the Reactive Manifesto that was pushed by Akka, without it hogging too many resources. I checked the open source Alpakka File project, but that is also streaming the Files
methods that return Path
s ...
Upvotes: 5
Views: 840
Reputation: 2156
Starting from DuncG's answer I got the following to work in Scala3 as a pretty generic Akka Stream class. This is actually very neat as it creates a sideffect of the Files.find
function which it immediately encapsulates back into a clean functional reactive stream.
class DirectoryList(
dir: Path,
matcher: (Path, BasicFileAttributes) => Boolean = (p,a) => true,
maxDepth: Int = 1
) extends GraphStage[SourceShape[(Path,BasicFileAttributes)]]:
import scala.jdk.FunctionConverters.*
import scala.jdk.OptionConverters.*
val out: Outlet[(Path,BasicFileAttributes)] = Outlet("PathAttributeSource")
override val shape = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private var next: (Path,BasicFileAttributes) = _
def append(path: Path, att: BasicFileAttributes): Boolean =
val matched = matcher(path,att)
if matched then next = (path,att)
matched
private val pathStream = Files.find(dir, maxDepth, append.asJava)
private val sit = pathStream.iterator()
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if sit.hasNext then
sit.next()
push(out,next)
else
pathStream.close()
complete(out)
}
override def onDownstreamFinish(cause: Throwable): Unit =
pathStream.close()
super.onDownstreamFinish(cause)
})
}
end DirectoryList
This can then be used as follows:
val sourceGraph = DirectoryList(Path.of("."),depth=10)
val result = Source.fromGraph(sourceGraph).map{ (p: Path,att: BasicFileAttributes) =>
println(s"received <$p> : dir=${att.isDirectory}")}.run()
The full source code is here on github and an initial test here. Perhaps one could improve it by tuning the answers so that a certain number of path attribute pairs are passed along in bulk.
Upvotes: 1
Reputation: 15156
You can access file attributes with their path by using Files.find
which accepts a BiPredicate<Path, BasicFileAttributes> and store the value as it tests each path.
The side effect action inside the BiPredicate will enable operations on both objects without needing to touch the file system per item in the path. With your predicate condition yourPred
, side effect predicate
below will collect the attributes for you to retrieve inside the stream processing:
public static void main(String[] args) throws IOException {
Path dir = Path.of(args[0]);
// Use `ConcurrentHashMap` if using `stream.parallel()`
HashMap <Path,BasicFileAttributes> attrs = new HashMap<>();
BiPredicate<Path, BasicFileAttributes> yourPred = (p,a) -> true;
BiPredicate<Path, BasicFileAttributes> predicate = (p,a) -> {
return yourPred.test(p, a)
// && p.getNameCount() == dir.getNameCount()+1 // Simulates Files.list
&& attrs.put(p, a) == null;
};
try(var stream = Files.find(dir, Integer.MAX_VALUE, predicate)) {
stream.forEach(p-> System.out.println(p.toString()+" => "+attrs.get(p)));
// Or: if your put all your handling code in the predicate use stream.count();
}
}
To similate the effect of File.list
use a one level find
scanner:
BiPredicate<Path, BasicFileAttributes> yourPred = (p,a) -> p.getNameCount() == dir.getNameCount()+1;
For a large folder scan you should clean up the attrs map as you go by inserting attrs.remove(p);
after consuming the path.
Edit
The answer above can be refactored to a 3 line call returning stream of Map.Entry<Path, BasicFileAttributes>
, or it's easy to add a class/record to hold the Path/BasicFileAttribute pair and return Stream<PathInfo>
instead:
/**
* Call Files.find() returning a stream with both Path+BasicFileAttributes
* as type Map.Entry<Path, BasicFileAttributes>
* <p>Could declare a specific record to replace Map.Entry as:
* record PathInfo(Path path, BasicFileAttributes attr) { };
*/
public static Stream<Map.Entry<Path, BasicFileAttributes>>
find(Path dir, int maxDepth, BiPredicate<Path, BasicFileAttributes> matcher, FileVisitOption... options) throws IOException {
HashMap <Path,BasicFileAttributes> attrs = new HashMap<>();
BiPredicate<Path, BasicFileAttributes> predicate = (p,a) -> (matcher == null || matcher.test(p, a)) && attrs.put(p, a) == null;
return Files.find(dir, maxDepth, predicate, options).map(p -> Map.entry(p, attrs.remove(p)));
}
Upvotes: 2
Reputation: 1760
If there's some way of directly getting those attributes, I don't know.
On converting the FileVisitor API to reactive streams:
The mechanism of reactive streams backpressure is a pull-push model, where a demand is first signaled by downstream (the pull part) and then the upstream is allowed to send no more items than the demand signaled (the push part).
The problem with the FileVisitor API is that there's no way to directly hookup with such control flow mechanism. Once you set it off it just goes calling your callback and not caring too much about anything else.
There's no clean way of bridging this, but one way you could do that is using the Source.queue
(https://doc.akka.io/docs/akka/current/stream/operators/Source/queue.html) to isolate that API from the rest of your stream like that:
val queue =
Source.queue[BasiFileAttributes](bufferSize, OverflowStrategy.backpressure)
//the rest of your Akka Streams pipeline
.run(system);
This will materialize to queue that you can now pass to your FileVisitor
. Anything you offer
to that queue will go down the stream. If when you offer
there's no demand and the queue is full the Future
returned by offer
will not complete until such time that there's space in the queue. So in the API you could simply do:
//inside the FileVisitor API callback
Await.result(queue.offer(attrs), Duration.Inf)
And that would block the callback thread when the stream is backpressured. Ugly but isolated.
Upvotes: 2