TePi
TePi

Reputation: 37

Download multiple files concurrently from FTP using FluentFTP with a maximum value

I would like to download multiple download files recursively from a FTP Directory, to do this I'm using FluentFTP library and my code is this one:

private async Task downloadRecursively(string src, string dest, FtpClient ftp)
{

    foreach(var item in ftp.GetListing(src))
    {
        if (item.Type == FtpFileSystemObjectType.Directory)
        {
            if (item.Size != 0)
            {
                System.IO.Directory.CreateDirectory(Path.Combine(dest, item.Name));
                downloadRecursively(Path.Combine(src, item.Name), Path.Combine(dest, item.Name), ftp);
            }
        }
        else if (item.Type == FtpFileSystemObjectType.File)
        {
            await ftp.DownloadFileAsync(Path.Combine(dest, item.Name), Path.Combine(src, item.Name));
        }
    }
}

I know you need one FtpClient per download you want, but how can I make to use a certain number of connections as maximum, I guess that the idea is to create, connect, download and close per every file I find but just having a X number of downloading files at the same time. Also I'm not sure if I should create Task with async, Threads and my biggest problem, how to implement all of this.

Answer from @Bradley here seems pretty good, but the question does read every file thas has to download from an external file and it doesn't have a maximum concurrent download value so I'm not sure how to apply these both requirements.

Upvotes: 3

Views: 3699

Answers (3)

Theodor Zoulias
Theodor Zoulias

Reputation: 43545

Here is a TPL Dataflow approach. A BufferBlock<FtpClient> is used as a pool of FtpClient objects. The recursive enumeration takes a parameter of type IEnumerable<string> that holds the segments of one filepath. These segments are combined differently when constructing the local and the remote filepath. As a side effect of invoking the recursive enumeration, the paths of the remote files are sent to an ActionBlock<IEnumerable<string>>. This block handles the parallel downloading of the files. Its Completion property contains eventually all the exceptions that may have occurred during the whole operation.

public static Task FtpDownloadDeep(string ftpHost, string ftpRoot,
    string targetDirectory, string username = null, string password = null,
    int maximumConnections = 1)
{
    // Arguments validation omitted            
    if (!Directory.Exists(targetDirectory))
        throw new DirectoryNotFoundException(targetDirectory);
    var fsLocker = new object();

    var ftpClientPool = new BufferBlock<FtpClient>();

    async Task<TResult> UsingFtpAsync<TResult>(Func<FtpClient, Task<TResult>> action)
    {
        var client = await ftpClientPool.ReceiveAsync();
        try { return await action(client); }
        finally { ftpClientPool.Post(client); } // Return to the pool
    }

    var downloader = new ActionBlock<IEnumerable<string>>(async path =>
    {
        var remotePath = String.Join("/", path);
        var localPath = Path.Combine(path.Prepend(targetDirectory).ToArray());
        var localDir = Path.GetDirectoryName(localPath);
        lock (fsLocker) Directory.CreateDirectory(localDir);
        var status = await UsingFtpAsync(client =>
            client.DownloadFileAsync(localPath, remotePath));
        if (status == FtpStatus.Failed) throw new InvalidOperationException(
            $"Download of '{remotePath}' failed.");
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = maximumConnections,
        BoundedCapacity = maximumConnections,
    });

    async Task Recurse(IEnumerable<string> path)
    {
        if (downloader.Completion.IsCompleted) return; // The downloader has failed
        var listing = await UsingFtpAsync(client =>
            client.GetListingAsync(String.Join("/", path)));
        foreach (var item in listing)
        {
            if (item.Type == FtpFileSystemObjectType.Directory)
            {
                if (item.Size != 0) await Recurse(path.Append(item.Name));
            }
            else if (item.Type == FtpFileSystemObjectType.File)
            {
                var accepted = await downloader.SendAsync(path.Append(item.Name));
                if (!accepted) break; // The downloader has failed
            }
        }
    }

    // Move on to the thread pool, to avoid ConfigureAwait(false) everywhere
    return Task.Run(async () =>
    {
        // Fill the FtpClient pool
        for (int i = 0; i < maximumConnections; i++)
        {
            var client = new FtpClient(ftpHost);
            if (username != null && password != null)
                client.Credentials = new NetworkCredential(username, password);
            ftpClientPool.Post(client);
        }

        try
        {
            // Enumerate the files to download
            await Recurse(new[] { ftpRoot });
            downloader.Complete();
        }
        catch (Exception ex) { ((IDataflowBlock)downloader).Fault(ex); }

        try
        {
            // Await the downloader to complete
            await downloader.Completion;
        }
        catch (OperationCanceledException)
            when (downloader.Completion.IsCanceled) { throw; }
        catch { downloader.Completion.Wait(); } // Propagate AggregateException
        finally
        {
            // Clean up
            if (ftpClientPool.TryReceiveAll(out var clients))
                foreach (var client in clients) client.Dispose();
        }
    });
}

Usage example:

await FtpDownloadDeep("ftp://ftp.test.com", "", @"C:\FtpTest",
    "username", "password", maximumConnections: 10);

Note: The above implementation enumerates the remote directory lazily, following the tempo of the downloading process. If you prefer to enumerate it eagerly, gathering all info available about the remote listings ASAP, just remove the BoundedCapacity = maximumConnections configuration from the ActionBlock that downloads the files. Be aware that doing so could result in high memory consumption, in case the remote directory has a deep hierarchy of subfolders, containing cumulatively a huge number of small files.

Upvotes: 3

Martin Prikryl
Martin Prikryl

Reputation: 202292

Use:

var clients = new ConcurrentBag<FtpClient>();

var opts = new ParallelOptions { MaxDegreeOfParallelism = maxConnections };
Parallel.ForEach(files, opts, file =>
{
    file = Path.GetFileName(file);

    string thread = $"Thread {Thread.CurrentThread.ManagedThreadId}";
    if (!clients.TryTake(out var client))
    {
        Console.WriteLine($"{thread} Opening connection...");
        client = new FtpClient(host, user, pass);
        client.Connect();
        Console.WriteLine($"{thread} Opened connection {client.GetHashCode()}.");
    }

    string remotePath = sourcePath + "/" + file;
    string localPath = Path.Combine(destPath, file);
    string desc =
        $"{thread}, Connection {client.GetHashCode()}, " +
        $"File {remotePath} => {localPath}";
    Console.WriteLine($"{desc} - Starting...");
    client.DownloadFile(localPath, remotePath);
    Console.WriteLine($"{desc} - Done.");

    clients.Add(client);
});

Console.WriteLine($"Closing {clients.Count} connections");
foreach (var client in clients)
{
    Console.WriteLine($"Closing connection {client.GetHashCode()}");
    client.Dispose();
}

Another approach is to start a fixed number of threads with one connection for each and have them pick files from a queue.

For an example of an implementation, see my article for WinSCP .NET assembly:
Automating transfers in parallel connections over SFTP/FTP protocol


A similar question about SFTP:
Processing SFTP files using C# Parallel.ForEach loop not processing downloads

Upvotes: 4

Enigmativity
Enigmativity

Reputation: 117064

I'd split this into three parts.

  1. Recursively build a list of source and destination pairs.
  2. Create the directories required.
  3. Concurrently download the files.

It's the last part that is slow and should be done in parallel.

Here's the code:

private async Task DownloadRecursively(string src, string dest, FtpClient ftp)
{
    /* 1 */
    IEnumerable<(string source, string destination)> Recurse(string s, string d)
    {
        foreach (var item in ftp.GetListing(s))
        {
            if (item.Type == FtpFileSystemObjectType.Directory)
            {
                if (item.Size != 0)
                {
                    foreach(var pair in Recurse(Path.Combine(s, item.Name), Path.Combine(d, item.Name)))
                    {
                        yield return pair;
                    }
                }
            }
            else if (item.Type == FtpFileSystemObjectType.File)
            {
                yield return (Path.Combine(s, item.Name), Path.Combine(d, item.Name));
            }
        }
    }

    var pairs = Recurse(src, dest).ToArray();
    
    /* 2 */
    foreach (var d in pairs.Select(x => x.destination).Distinct())
    {
        System.IO.Directory.CreateDirectory(d);
    }

    /* 3 */
    var downloads =
        pairs
            .AsParallel()
            .Select(x => ftp.DownloadFileAsync(x.source, x.destination))
            .ToArray();
    
    await Task.WhenAll(downloads);
}

It should be clean, neat, and easy to reason about code.

Upvotes: 0

Related Questions