Reputation: 2382
I'm using Stream.generate
to get data from Instagram. As instagram limits calls per hour I want generate
to run less frequent then every 2 seconds.
I've chosen such title because I moved from ScheduledExecutorService.scheduleAtFixedRate
and that's what I was searching for. I do realise that stream intermediate operations are lazy and cannot be called on schedule. If you have better idea for title let me know.
So again I want to have at least 2 second delay between genations.
My attempt wich doesn't take into consideration time consumed by operations after generate
, which might take longer then 2s:
Stream.generate(() -> {
List<MediaFeedData> feedDataList = null;
while (feedDataList == null) {
try {
Thread.sleep(2000);
feedDataList = newData();
} catch (InstagramException e) {
notifyError(e.getMessage());
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return feedDataList;
})
Upvotes: 2
Views: 1845
Reputation: 298529
As far as I understand, your question is about solving two problems:
You can solve the first task by using a deadline-based waiting and the second by implementing a Spliterator
:
Stream<List<MediaFeedData>> stream = StreamSupport.stream(
new Spliterators.AbstractSpliterator<List<MediaFeedData>>(Long.MAX_VALUE, 0) {
long lastTime=System.currentTimeMillis();
@Override
public boolean tryAdvance(Consumer<? super List<MediaFeedData>> action) {
if(quitCondition()) return false;
List<MediaFeedData> feedDataList = null;
while (feedDataList == null) {
lastTime+=TimeUnit.SECONDS.toMillis(2);
while(System.currentTimeMillis()<lastTime)
LockSupport.parkUntil(lastTime);
try {
feedDataList=newData();
} catch (InstagramException e) {
notifyError(e.getMessage());
if(QUIT_ON_EXCEPTION) return false;
}
}
action.accept(feedDataList);
return true;
}
}, false);
Upvotes: 3
Reputation: 109613
Make a Timer and a semaphore. The timer raises the semaphore every 2 seconds, and in the stream you wait on every call for the semaphore.
This keeps the waits to the specified minimum (2 s), and - funnily - would even work with .parallel()
.
private final volatile Semaphore tickingSemaphore= new Semaphore(1, true);
In its own thread:
Stream.generate(() -> {
tickingSemaphore.acquire();
...
};
In the timer:
tickingSemaphore.release();
Upvotes: 1
Reputation: 6149
A solution would be to decouple the generator from the Stream
, for example using a BlockingQueue
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(100);
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1);
scheduler.scheduleAtFixedRate(() -> {
// Generate new data every 2s, regardless of their processing rate
ThreadLocalRandom random = ThreadLocalRandom.current();
queue.offer(random.nextInt(10));
}, 0, 2, TimeUnit.SECONDS);
Stream.generate(() -> {
try {
// Accept new data if ready, or wait for some more to be generated
return queue.take();
} catch (InterruptedException e) {}
return -1;
}).forEach(System.out::println);
If the data processing takes more than 2s, new data will be enqueued and wait to be consumed. If it takes less than 2s, the take
method in the generator will wait for new data to be produced by the scheduler.
This way, you are guaranteed to make less than N calls per hour to Instagram !
Upvotes: 4