Reputation: 521
Every interval I retrieve tweets with a certain query. These tweets have to be passed to services which calculate and manipulate those tweets. So these services are subscribed to my publisher. So publisher.hasSubscribers() returns true. But the submit or offer function does not invoke the onNext of my subscriber. So as a "fix", I cycle through my subscribers and invoke it myself. But that shouldn't be the case.
This is the constructor of my publisher.
public TwitterStreamer(Executor executor, int maxBufferCapacity, long period, TimeUnit unit, String searchQuery){
super(executor, maxBufferCapacity);
this.searchQuery = searchQuery;
scheduler = new ScheduledThreadPoolExecutor(1);
this.tweetGetter = scheduler.scheduleAtFixedRate(
() -> {
List<String> tweets = getTweets(searchQuery);
/* this.lastCall = LocalDateTime.now();
for(Flow.Subscriber sub : this.getSubscribers()){
sub.onNext(tweets);
}*/
this.submit(tweets);
if(tweets.size() >= 20) this.close();
}, 0, period, unit);
}
This is my subscriber
package myFlowAPI;
import Interfaces.IProcess;
import Services.LogToFileService;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
public class MySubscriber implements Flow.Subscriber<List<String>> {
private Flow.Subscription subscription;
private AtomicInteger count;
private IProcess processor;
private String name;
private int DEMAND = 0;
public MySubscriber(String name, IProcess processor){
this.name = name;
this.processor = processor;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
}
@Override
public void onNext(List<String> item) {
Object result = this.processor.process(item);
this.readResult(result);
switch (this.processor.getClass().getSimpleName()){
case "CalculateTweetStatsService":
if((Integer) result >= 20){
this.subscription.cancel();
}
break;
}
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error is thrown " + throwable.getMessage());
}
@Override
public void onComplete() {
if(this.processor instanceof LogToFileService){
((LogToFileService) processor).closeResource();
}
System.out.println("complete");
}
private void readResult(Object result){
System.out.println("Result of " + this.processor.getClass().getSimpleName() + " processor is " + result.toString());
}
}
This is the main where I subscribe to the publisher
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
String searchQuery;
try{
searchQuery = args[0] != null ? args[0] : "#capgemini50";
}catch (ArrayIndexOutOfBoundsException ex){
searchQuery = "#capgemini50";
}
TwitterStreamer streamer = new TwitterStreamer(executor, 5, 15L, SECONDS, searchQuery);
MySubscriber subscriber1 = new MySubscriber("LogFileSubscriber", new LogToFileService("./tweetsLogger.txt"));
MySubscriber subscriber2 = new MySubscriber("TotalTweetSubscriber",new CalculateTweetStatsService());
streamer.subscribe(subscriber1);
streamer.subscribe(subscriber2);
}
Upvotes: 6
Views: 850
Reputation: 72864
You need the subscriber to explicitly request data e.g. upon subscription (see https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Subscription.html#request-long-):
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
Same upon processing in onNext() to request the next item.
Upvotes: 9