Reputation: 37
I would like to download multiple download files recursively from a FTP Directory, to do this I'm using FluentFTP library and my code is this one:
private async Task downloadRecursively(string src, string dest, FtpClient ftp)
{
foreach(var item in ftp.GetListing(src))
{
if (item.Type == FtpFileSystemObjectType.Directory)
{
if (item.Size != 0)
{
System.IO.Directory.CreateDirectory(Path.Combine(dest, item.Name));
downloadRecursively(Path.Combine(src, item.Name), Path.Combine(dest, item.Name), ftp);
}
}
else if (item.Type == FtpFileSystemObjectType.File)
{
await ftp.DownloadFileAsync(Path.Combine(dest, item.Name), Path.Combine(src, item.Name));
}
}
}
I know you need one FtpClient per download you want, but how can I make to use a certain number of connections as maximum, I guess that the idea is to create, connect, download and close per every file I find but just having a X number of downloading files at the same time. Also I'm not sure if I should create Task with async, Threads and my biggest problem, how to implement all of this.
Answer from @Bradley here seems pretty good, but the question does read every file thas has to download from an external file and it doesn't have a maximum concurrent download value so I'm not sure how to apply these both requirements.
Upvotes: 3
Views: 3699
Reputation: 43545
Here is a TPL Dataflow approach. A BufferBlock<FtpClient>
is used as a pool of FtpClient
objects. The recursive enumeration takes a parameter of type IEnumerable<string>
that holds the segments of one filepath. These segments are combined differently when constructing the local and the remote filepath. As a side effect of invoking the recursive enumeration, the paths of the remote files are sent to an ActionBlock<IEnumerable<string>>
. This block handles the parallel downloading of the files. Its Completion
property contains eventually all the exceptions that may have occurred during the whole operation.
public static Task FtpDownloadDeep(string ftpHost, string ftpRoot,
string targetDirectory, string username = null, string password = null,
int maximumConnections = 1)
{
// Arguments validation omitted
if (!Directory.Exists(targetDirectory))
throw new DirectoryNotFoundException(targetDirectory);
var fsLocker = new object();
var ftpClientPool = new BufferBlock<FtpClient>();
async Task<TResult> UsingFtpAsync<TResult>(Func<FtpClient, Task<TResult>> action)
{
var client = await ftpClientPool.ReceiveAsync();
try { return await action(client); }
finally { ftpClientPool.Post(client); } // Return to the pool
}
var downloader = new ActionBlock<IEnumerable<string>>(async path =>
{
var remotePath = String.Join("/", path);
var localPath = Path.Combine(path.Prepend(targetDirectory).ToArray());
var localDir = Path.GetDirectoryName(localPath);
lock (fsLocker) Directory.CreateDirectory(localDir);
var status = await UsingFtpAsync(client =>
client.DownloadFileAsync(localPath, remotePath));
if (status == FtpStatus.Failed) throw new InvalidOperationException(
$"Download of '{remotePath}' failed.");
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = maximumConnections,
BoundedCapacity = maximumConnections,
});
async Task Recurse(IEnumerable<string> path)
{
if (downloader.Completion.IsCompleted) return; // The downloader has failed
var listing = await UsingFtpAsync(client =>
client.GetListingAsync(String.Join("/", path)));
foreach (var item in listing)
{
if (item.Type == FtpFileSystemObjectType.Directory)
{
if (item.Size != 0) await Recurse(path.Append(item.Name));
}
else if (item.Type == FtpFileSystemObjectType.File)
{
var accepted = await downloader.SendAsync(path.Append(item.Name));
if (!accepted) break; // The downloader has failed
}
}
}
// Move on to the thread pool, to avoid ConfigureAwait(false) everywhere
return Task.Run(async () =>
{
// Fill the FtpClient pool
for (int i = 0; i < maximumConnections; i++)
{
var client = new FtpClient(ftpHost);
if (username != null && password != null)
client.Credentials = new NetworkCredential(username, password);
ftpClientPool.Post(client);
}
try
{
// Enumerate the files to download
await Recurse(new[] { ftpRoot });
downloader.Complete();
}
catch (Exception ex) { ((IDataflowBlock)downloader).Fault(ex); }
try
{
// Await the downloader to complete
await downloader.Completion;
}
catch (OperationCanceledException)
when (downloader.Completion.IsCanceled) { throw; }
catch { downloader.Completion.Wait(); } // Propagate AggregateException
finally
{
// Clean up
if (ftpClientPool.TryReceiveAll(out var clients))
foreach (var client in clients) client.Dispose();
}
});
}
Usage example:
await FtpDownloadDeep("ftp://ftp.test.com", "", @"C:\FtpTest",
"username", "password", maximumConnections: 10);
Note: The above implementation enumerates the remote directory lazily, following the tempo of the downloading process. If you prefer to enumerate it eagerly, gathering all info available about the remote listings ASAP, just remove the BoundedCapacity = maximumConnections
configuration from the ActionBlock
that downloads the files. Be aware that doing so could result in high memory consumption, in case the remote directory has a deep hierarchy of subfolders, containing cumulatively a huge number of small files.
Upvotes: 3
Reputation: 202292
Use:
ConcurrentBag
class to implement a connection pool;Parallel
class to parallelize the operation;ParallelOptions.MaxDegreeOfParallelism
to limit number of the concurrent threads.var clients = new ConcurrentBag<FtpClient>();
var opts = new ParallelOptions { MaxDegreeOfParallelism = maxConnections };
Parallel.ForEach(files, opts, file =>
{
file = Path.GetFileName(file);
string thread = $"Thread {Thread.CurrentThread.ManagedThreadId}";
if (!clients.TryTake(out var client))
{
Console.WriteLine($"{thread} Opening connection...");
client = new FtpClient(host, user, pass);
client.Connect();
Console.WriteLine($"{thread} Opened connection {client.GetHashCode()}.");
}
string remotePath = sourcePath + "/" + file;
string localPath = Path.Combine(destPath, file);
string desc =
$"{thread}, Connection {client.GetHashCode()}, " +
$"File {remotePath} => {localPath}";
Console.WriteLine($"{desc} - Starting...");
client.DownloadFile(localPath, remotePath);
Console.WriteLine($"{desc} - Done.");
clients.Add(client);
});
Console.WriteLine($"Closing {clients.Count} connections");
foreach (var client in clients)
{
Console.WriteLine($"Closing connection {client.GetHashCode()}");
client.Dispose();
}
Another approach is to start a fixed number of threads with one connection for each and have them pick files from a queue.
For an example of an implementation, see my article for WinSCP .NET assembly:
Automating transfers in parallel connections over SFTP/FTP protocol
A similar question about SFTP:
Processing SFTP files using C# Parallel.ForEach loop not processing downloads
Upvotes: 4
Reputation: 117064
I'd split this into three parts.
It's the last part that is slow and should be done in parallel.
Here's the code:
private async Task DownloadRecursively(string src, string dest, FtpClient ftp)
{
/* 1 */
IEnumerable<(string source, string destination)> Recurse(string s, string d)
{
foreach (var item in ftp.GetListing(s))
{
if (item.Type == FtpFileSystemObjectType.Directory)
{
if (item.Size != 0)
{
foreach(var pair in Recurse(Path.Combine(s, item.Name), Path.Combine(d, item.Name)))
{
yield return pair;
}
}
}
else if (item.Type == FtpFileSystemObjectType.File)
{
yield return (Path.Combine(s, item.Name), Path.Combine(d, item.Name));
}
}
}
var pairs = Recurse(src, dest).ToArray();
/* 2 */
foreach (var d in pairs.Select(x => x.destination).Distinct())
{
System.IO.Directory.CreateDirectory(d);
}
/* 3 */
var downloads =
pairs
.AsParallel()
.Select(x => ftp.DownloadFileAsync(x.source, x.destination))
.ToArray();
await Task.WhenAll(downloads);
}
It should be clean, neat, and easy to reason about code.
Upvotes: 0