user3675188
user3675188

Reputation: 7409

How to limit the number of concurrent threads

In VIDEO_URL, there are thousands of videos to be downloaded. I want to use threads to do the job, but limit to at most ten threads at a time. How could I rewrite the following code to get it?

VIDEO_URL.each do | video |
  @workers << Thread.new{dl_video(video)}
end
@workers.each { |t| t.join }

Update

The gem thread pool seems not to be blocked after the work threads more than 10, Is the I/O block to make the thread pool has no effect ?

If I download videos without threadpool, it works well.

But if I download videos with threadpool, the video won't be download, the main thread supposed to be block when there are 10 workers , but it didn't. ( Every video should at least has 1 minute to download )

MAX_WORKERS = 10
@pool = Thread.pool(MAX_WORKERS)

def dl_video(video)
  File.open(video["title"], "wb") do |saved_file|
    @pool.process{
      saved_file.write open(video["link"], :allow_redirections => :safe).read
      # saved_file.write(HTTParty.get(video["link"]).parsed_response)
    }
  end
end

Upvotes: 3

Views: 1342

Answers (4)

SHS
SHS

Reputation: 7744

A simple solution (without involving any new gems) would be to initiate 10 threads that pop and process the first URL in your array.

[].tap do |threads|
  urls = VIDEO_URLS.clone
  semaphore = Mutex.new
  number_of_threads = 10

  number_of_threads.times do
    threads << Thread.new do
      until urls.empty?        
        url = semaphore.synchronize { urls.pop }
        download_video(url)
      end
    end
  end
end.each(&:join)

Another solution could be to split your array into different slices (10 or less); there are different ways you can do this. Afterwards, each thread could process each slice. The code might be longer overall but you would get rid of Mutex, if you want.

[].tap do |threads|
  slices # split VIDEO_URLS into required slices. leave this up to you.
  slices.each do |urls|
    threads << Thread.new do
      urls.each { |url| download_video(url) }
    end
  end
end.each(&:join)

Upvotes: 1

jbr
jbr

Reputation: 6258

What you want is called a thread pool. There is an extension for Ruby's threads, which includes this functionality.

Untested snippet directly adapted from the libraries example:

require 'thread/pool'

# Create thread pool with up to 10 simultaneous running threads 
pool = Thread.pool(10)

VIDEO_URL.each do | video |
  # Add each download task the the thread pool
  pool.process do 
    dl_video(video)
  end
end

# Block and wait for the thread pool to run out of tasks
pool.shutdown

Upvotes: 2

pangpang
pangpang

Reputation: 8821

You can use each_slice.

VIDEO_URL.each_slice(10) do | batch |
    batch.each do |video|
        @workers << Thread.new{dl_video(video)}
    end
    @workers.each { |t| t.join }
    @workers = []
end

Upvotes: 0

sawa
sawa

Reputation: 168121

What you are trying to implement is a frequently used pattern, and it is called thread pool.

I haven't tried it, but perhaps threadpool gem or something similar is worth looking into:

require "threadpool"

pool = ThreadPool.new(10)
VIDEO_URL.each{|video| pool.process{dl_video(video)}}

Upvotes: 2

Related Questions