Reputation: 348
We have a prototype of esper running, but the performance is considerably lacking. I guess this is my fault somehow rather than inherently an issue with esper, so was looking for help in locating where my performance issue is.
I am running one instance of the esper service, and I have allocated the memory constraints as follows: -Xmx6G -Xms1G (I have tried various combinations of these values). And it can use 4 cores of the CPU. No other services are running at the time of these tests, only esper, kafka, zookeeper.
I am using Akka Streams to stream events into Esper, the service is very simple, it streams in from kafka, inserts the events into Esper Runtime, Esper has 3 EPStatements tested and working. There is one listener and I add it to all 3 statements, the listener outputs the matched events to kafka.
Some things I've tried to isolate where the performance issue is:
Only number 4 above caused any significant observable performance benefit.
Below is an example query we're running through esper. It's tested and works, I have read the performance tuning section of the documentation and it seems ok to me. All my queries follow a similar format:
select * from EsperEvent#time(5 minutes)
match_recognize (
partition by asset_id
measures A as event1, B as event2, C as event3
pattern (A Z* B Z* C)
interval 10 seconds or terminated
define
A as A.eventtype = 13 AND A.win_EventID = "4624" AND A.win_LogonType = "3",
B as B.eventtype = 13 AND B.win_EventID = "4672",
C as C.eventtype = 13 AND (C.win_EventID = "4697" OR C.win_EventID = "7045")
)
Some Code..
Here is my akka stream:
kafkaConsumer
.via(parsing) // Parse the json event to a POJO for esper. Have tried without this step also, no performance impact
.via(esperFlow) // mapAsync call to sendEvent(...)
//Here I am using kafka to measure the flow throughput rate. This is where I establish my throughput rate, based on the rate messages are written to "esper_flow_through" topic.
.map(rec => new ProducerRecord[Array[Byte], String]("esper_flow_through", Serialization.write(rec)))
.runWith(sink)
esperFlow (Parallelism = 4 by default):
val esperFlow = Flow[EsperEvent]
.mapAsync(Parallelism)(event => Future {
engine.getEPRuntime.sendEvent(event)
event
})
Listener:
override def update(newEvents: Array[EventBean], oldEvents: Array[EventBean], statement: EPStatement, epServiceProvider: EPServiceProvider): Unit = Future {
logger.info(s"Received Listener updates: Query Name: ${statement.getName} ---- ${newEvents.map(_.getUnderlying)}, $oldEvents")
statement.getName match {
case "SERVICE_INSTALL" => serviceInstall.increment(newEvents.length)
case "ADMIN_GROUP" => adminGroup.increment(newEvents.length)
case "SMB_SHARE" => smbShare.increment(newEvents.length)
}
newEvents.map(_.getUnderlying.toString).toList
.foreach(queryMatch => {
val record: ProducerRecord[Array[Byte], String] = new ProducerRecord[Array[Byte], String]("esper_output", queryMatch)
producer.send(record)
})
}
Performance observations:
Profiling, nothing seems out of sorts here:
The rate seems very low, so I am assuming I am missing something here with regards to some esper configuration?
Our target throughput is to have ~10k per second. We are a long way from this, and we have a similar POC in Spark that gets closer to this target.
Update:
Following @user650839 comments, I was able to improve my throughput to a steady 1k per second. Both of these queries produce the same throughput:
select * from EsperEvent(eventtype = 13 and win_EventID in ("4624", "4672", "4697", "7045"))#time(5 minutes)
match_recognize (
partition by asset_id
measures A as event1, B as event2, C as event3
pattern (A B C)
interval 10 seconds or terminated
define
A as A.eventtype = 13 AND A.win_EventID = "4624" AND A.win_LogonType = "3",
B as B.eventtype = 13 AND B.win_EventID = "4672",
C as C.eventtype = 13 AND (C.win_EventID = "4697" OR C.win_EventID = "7045"))
create context NetworkLogonThenInstallationOfANewService
start EsperEvent(eventtype = 13 AND win_EventID = "4624" AND win_LogonType = "3")
end pattern [
b=EsperEvent(eventtype = 13 AND win_EventID = "4672") ->
c=EsperEvent(eventtype = 13 AND (win_EventID = "4697" OR win_EventID = "7045"))
where timer:within(5 minutes)
]
context NetworkLogonThenInstallationOfANewService select * from EsperEvent output when terminated
However 1k per second is still too slow for our needs.
Upvotes: 0
Views: 446
Reputation: 2594
The match-recognize is ill-defined. An A-event or B-event or C-event event can also be a Z event since anything matches a Z-event (Z is undefined). Therefore there is a HUGE number of combinations that is possible. I think for 4 incoming events there are already like 1*2*3*4 combinations that the match recognize keeps track off! Match-recognize tracks all possible combinations and when stuff matches match-recognize sorts and ranks the combinations and outputs all/any/some. Match-recognize may be a poor choice here or maybe define Z as something that doesn't also match A/B/C.
Instead of match-recognize I would go with a context that initiates with an A-event and terminates with a C-event with "output when terminated".
Also, they way you designed the query the time window will retain all events. You could do better.
select * from EsperEvent(eventtype = 13 and win_EventID in ("4624", "4672", "4692", "7045"))#time(5 minutes)
match_recognize (
.........
define
A as A.win_EventID = "4624" AND A.win_LogonType = "3",
B as B.win_EventID = "4672",
C as C.win_EventID = "4697" OR C.win_EventID = "7045"
)
Notice, the EsperEvent(eventtype=13 ....)
discards events before they go into the time window. There is a performance tip in the docs around using filter criteria to remove unwanted events.
EDIT: A mistake is measuring IO throughput and Esper throughput as one. Remove the IO. Test Esper using Esper APIs with data your code produces. Once confident add the IO back.
Upvotes: 2