Reputation: 75
I'm working on a Java application where I need to periodically check a MongoDB collection (CacheBilgi) and enqueue new data into a concurrent queue (cacheInfoQueue). Then, I need to process this queue with a maximum of two concurrent tasks at a time. Each time the method checks the queue, it should be able to process new incoming data as well.
Each task consists of two main processing functions (fundCardCacheEvictionPolicy and fundCardCacheGeneration), both of which should utilize a shared thread pool efficiently, ensuring that idle threads can assist other tasks when available. Finally, once all processing is complete, the thread pool should shut down, but it should be recreated when the queue is filled again.
Currently, I'm using ForkJoinPool in multiple places, which might be inefficient:
Questions:
Here is my current implementation:
@Service
@RequiredArgsConstructor
public class xxx extends BaseService {
private MongoCollection<Document> cacheInfoColl;
private MongoCollection<Document> serviceInfoColl;
private final ConcurrentLinkedQueue<CacheBilgi> cacheInfoQueue = new ConcurrentLinkedQueue<>();
@PostConstruct
private void init() {
this.cacheInfoColl = this.mongoTemplate.getCollection(Cons.CACHE_BILGI);
this.serviceInfoColl = this.mongoTemplate.getCollection(Cons.SERVIS_BILGI);
}
@Scheduled(fixedDelay = 5000)
public void fundCardCacheProgress() {
Query query = new Query();
query.addCriteria(Criteria.where("is_cached").is(Boolean.FALSE));
query.addCriteria(Criteria.where("is_locked").is(Boolean.FALSE));
List<CacheBilgi> cacheList = mongoTemplate.find(query, CacheBilgi.class, Cons.CACHE_BILGI);
for(CacheBilgi cacheData : cacheList) {
cacheInfoColl.updateOne(Filters.eq("_id", cacheData.getId()),
Updates.set("is_locked", Boolean.TRUE));
cacheInfoQueue.offer(cacheData);
}
checkCacheListQueue();
}
public void checkCacheListQueue() {
while(!cacheInfoQueue.isEmpty()) {
int threadCountForList = cacheInfoQueue.size() >= 2 ? 2 : 1;
ForkJoinPool customThreadPoolForList = new ForkJoinPool(threadCountForList);
try {
customThreadPoolForList.submit(() -> {
CacheBilgi cacheData = cacheInfoQueue.poll();
Set<String> updatedCodes = ConcurrentHashMap.newKeySet();
fundCardCacheEvictionPolicy(cacheData, updatedCodes);
fundCardCacheGeneration(updatedCodes);
cacheInfoColl.updateOne(Filters.eq("_id", cacheData.getId()),
Updates.set("is_cached", true));
}).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPoolForList.shutdown();
}
}
}
public void fundCardCacheEvictionPolicy(CacheBilgi cacheData, Set<String> updatedCodes) {
int threadCount = (int) Math.max(1, Runtime.getRuntime().availableProcessors() * 0.85);
ForkJoinPool customThreadPool = new ForkJoinPool(threadCount);
try {
customThreadPool.submit(() -> {
String servis = cacheData.getServis();
List<String> codes = cacheData.getCode();
codes.parallelStream().forEach(code -> {
switch (servis) {
//..
}
});
}).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
public void fundCardCacheGeneration(Set<String> codes){
int threadCount = (int) Math.max(1, Runtime.getRuntime().availableProcessors() * 0.85);
ForkJoinPool customThreadPool = new ForkJoinPool(threadCount);
try {
customThreadPool.submit(() ->
codes.parallelStream().forEach(code -> {
//..
})
).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
}
Upvotes: 0
Views: 40