Reputation: 11
In my application scenario, I created 100 independent DRL rules.
Some of these DRL rules are time-consuming because LHS needs to rely on RESTful APIs to query the factual basis (for example, the query order amount interface and the query historical order details interface).
I wonder if there is any way to execute LHS logic for DRL rules in parallel to improve performance?
For example, in the following four DRL rules, LHS will call the MyThreadUtil#sleep method. Is there any way to execute them in parallel with multiple threads?
package multi.thread;
import com.hp.util.MyThreadUtil;
rule "multi_thread_1"
no-loop true
when
$sleepTime1 : Integer() from MyThreadUtil.sleep();
then
System.out.println($sleepTime1);
end
rule "multi_thread_2"
no-loop true
when
$sleepTime2 : Integer() from MyThreadUtil.sleep();
then
System.out.println($sleepTime2);
end
rule "multi_thread_3"
no-loop true
when
$sleepTime3 : Integer() from MyThreadUtil.sleep();
then
System.out.println($sleepTime3);
end
rule "multi_thread_4"
no-loop true
when
$sleepTime4 : Integer() from MyThreadUtil.sleep();
then
System.out.println($sleepTime4);
end
And I execute it using the following Java code.
KieServices kieServices = KieServices.Factory.get();
KieContainer kieContainer = kieServices.newKieClasspathContainer();
KieSession session = kieContainer.newKieSession();
session.fireAllRules(new RuleNameStartsWithAgendaFilter("multi_thread_"));
session.dispose();
Thanx.
By reading the official website document(https://docs.drools.org/7.73.0.Final/drools-docs/html_single/#rule-base-configuration-con_decision-engine
), The multithreadEvaluation configuration is enabled, The debug source code went to the org.tools.core.common.CompositeDefaultAgenda#parallelFire method and found that there are still no multithreaded parallel execution rules.
Upvotes: 1
Views: 594
Reputation: 2398
multithreadEvaluation configuration is enabled, (...) and found that there are still no multithreaded parallel execution rules.
That is because multithreadEvaluation
works when a KB can be partitioned, while your KB example rules clearly is not. Being multithreadEvaluation
an advanced feature I would advise to use it only when you can be sure you KB can actually be partitioned.
Otherwise, by description of your original question, this does not seems to be a concerns strictly related to Drools --otherwise I would have expected an example KB when clearly the semantic partitioning of events from different, slow, sources would have had an impact on other rules.
To address the need of several, slow, sources need to fetch data in parallel, in order to later make available to some rules the data once it has arrived, you might consider adopting a reactive framework to integrate with your application, in addition to Drools. One such reactive framework I personally like is Mutiny.
For example, you might consider Merging several sources:
Multi<Tuple2> first = Multi.createFrom().ticks().every(Duration.ofMillis(10))
.onItem().transform(l -> new Tuple2("Stream 1 - ", l));
Multi<Tuple2> second = Multi.createFrom().ticks().every(Duration.ofMillis(15))
.onItem().transform(l -> new Tuple2("Stream 2 - ", l));
Cancellable cancellable = Multi.createBy().merging().streams(first, second)
.subscribe().with(s -> System.out.println("Received: " + s));
Don't forget to subscribe the streams on the executor thread!
For example, here is snippet of code which fetches different, heterogeneous type of resources from a Kubernetes cluster, and then make it available to a Drools session:
private Multi<KubernetesResource> mutinyFabric8KubernetesClient(Class<? extends KubernetesResource> resourceType,
Function<KubernetesClient, List<? extends KubernetesResource>> blockingFn) {
return Multi.createFrom().<KubernetesResource>items(() -> {
var res = blockingFn.apply(client);
LOG.debug("Fetched {} {}s", res.size(), resourceType.getName());
return res.stream();
})
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
}
public Collection<KubernetesResource> levelTrigger() {
Multi<KubernetesResource> deployments = mutinyFabric8KubernetesClient(Deployment.class, c -> c.apps().deployments().list().getItems());
Multi<KubernetesResource> statefulSets = mutinyFabric8KubernetesClient(StatefulSet.class, c -> c.apps().statefulSets().list().getItems());
Multi<KubernetesResource> pods = mutinyFabric8KubernetesClient(Pod.class, c -> c.pods().inAnyNamespace().list().getItems());
Multi<KubernetesResource> persistentVolumeClaims = mutinyFabric8KubernetesClient(PersistentVolumeClaim.class, c -> c.persistentVolumeClaims().list().getItems());
Multi<KubernetesResource> services = mutinyFabric8KubernetesClient(Service.class, c -> c.services().list().getItems());
Multi<KubernetesResource> configMaps = mutinyFabric8KubernetesClient(ConfigMap.class, c -> c.configMaps().inAnyNamespace().list().getItems());
return Multi.createBy().merging().streams(deployments, statefulSets, pods, persistentVolumeClaims, services, configMaps)
.collect()
.asList()
.await()
.atMost(Duration.ofSeconds(10));
}
multithreadEvaluation
only if KB can be partitionedUpvotes: 0