Reputation: 71
My application handles multiple request but the rules in my knowledge session are executed by single thread only. For eg: Thread 1 and Thread 2 enters the knowledge session with the gap of 2millisec But Thread 1 executes its own rules and even the rules for Thread 2 is executed by Thread 1. Imagine if there are 1000 request which means the rules for each requests will be executed by only a 1 Thread?
Is there any way where in DROOLS where we can prevent this and ensure that the rules are executed by multiple Threads?
Below is a small sample test I tried:
Java Class:
import java.math.BigDecimal;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseFactory;
import org.drools.builder.KnowledgeBuilder;
import org.drools.builder.KnowledgeBuilderError;
import org.drools.builder.KnowledgeBuilderErrors;
import org.drools.builder.KnowledgeBuilderFactory;
import org.drools.builder.ResourceType;
import org.drools.io.ResourceFactory;
import org.drools.runtime.StatefulKnowledgeSession;
import org.drools.runtime.rule.WorkingMemoryEntryPoint;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class DJ_Test {
public static void main(String[] args) {
try {
System.out.println("In main");
// load up the knowledge base
KnowledgeBase kbase = readKnowledgeBase();
final StatefulKnowledgeSession ksession = kbase.newStatefulKnowledgeSession();
final WorkingMemoryEntryPoint entry =ksession.getWorkingMemoryEntryPoint("RequestStream");
final Object obj_1= new Object();
Thread t1 = new Thread(){
public void run(){System.out.println(Thread.currentThread().getName() + " is running");
entry.insert(obj_1);
ksession.fireAllRules();
System.out.println(Thread.currentThread().getName() + " is terminated");
}
};
final Object obj_2= new Object();
Thread t2 = new Thread(){
public void run(){
try{
Thread.sleep(8000);
}catch(Exception e){
}
System.out.println(Thread.currentThread().getName() + " is running");
entry.insert(obj_2);
ksession.fireAllRules();
System.out.println(Thread.currentThread().getName() + " is terminated");
}
};
t1.start();
t2.start();
} catch (Throwable t) {
t.printStackTrace();
}
}
private static KnowledgeBase readKnowledgeBase() throws Exception {
/* KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
kbuilder.add(ResourceFactory.newClassPathResource("rulesFlow.bpmn"), ResourceType.BPMN2);
kbuilder.add(ResourceFactory.newClassPathResource("KansasSalesTax.drl"), ResourceType.DRL);
kbuilder.add(ResourceFactory.newClassPathResource("MissouriSalesTax.drl"), ResourceType.DRL);
kbuilder.add(ResourceFactory.newClassPathResource("SalesTax.drl"), ResourceType.DRL);
KnowledgeBuilderErrors errors = kbuilder.getErrors();
if (errors.size() > 0) {
for (KnowledgeBuilderError error: errors) {
System.err.println(error);
}
throw new IllegalArgumentException("Could not parse knowledge.");
}
KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
return kbase;*/
ClassPathXmlApplicationContext serviceContext = new ClassPathXmlApplicationContext( "droolsContext.xml" );
return (KnowledgeBase) serviceContext.getBean("kbase1");
}
public static class DJ_Message {
public static final int thread_1 = 1;
public static final int thread_2 = 2;
private String message;
private int status;
public String getMessage() {
return this.message;
}
public void setMessage(String message) {
this.message = message;
}
public int getStatus() {
return this.status;
}
public void setStatus(int status) {
this.status = status;
}
}
}
DRL file
package com.sample
import com.sample.DroolsTest.Message;
//global CepService cepService;
declare Object
@role( event )
end
rule "rule_1"
salience 100
when
$o : Object() from entry-point RequestStream
then
System.out.println( "Rule 1 fired by " + Thread.currentThread().getName() );
Thread.sleep(5000);
end
rule "rule_2"
salience 80
when
$o : Object() from entry-point RequestStream
then
System.out.println( "Rule 2 fired by " + Thread.currentThread().getName() );
Thread.sleep(5000);
end
rule "rule_3"
salience 60
when
$o : Object() from entry-point RequestStream
then
System.out.println( "Rule 3 fired by " + Thread.currentThread().getName() );
//cepService.executingThread1();
end
rule "4"
when
Message( status == Message.GOODBYE, myMessage : message )
then
System.out.println( myMessage );
//cepService.executingThread2();
end
Upvotes: 2
Views: 6200
Reputation: 1019
You can use Stateful knowledge session in multi threaded environment. Before your app starts you have to serialize "KnowledgeBase" into a file/db". Later each thread will not create its own copy of "KnowledgeBase" but will de serialize "KnowledgeBase" from the "file/db".
If we don't serialize/deserialize "KnowledgeBase", each thread when required will try to load rules and create its own "KnowledgeBase", finally at one point if threads increases your application might end up throwing "java.lan.OutOfMemory permgen" space error. As each thread will try to create its own copy of knowledgebase by loading classes again and again into the memory.
Upvotes: 2
Reputation: 1546
StatefulKnowledgeSession
s are not thread-safe according to the API docs. If you absolutely need to execute rules on multiple threads, reformulate your problem using StatelessKnowledgeSession
s instead.
Upvotes: 1