Reputation: 1
I'm having a hard time trying to figure out the reason I keep seeing:
`HibernateOptimisticLockingFailureException: FlowExecution: optimistic locking failed; nested exception is org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)`
I have a service that uses Quartz Scheduler to fire jobs, in my context these jobs are called Flows
and each flow might be composed from several Tasks
, flows and tasks are Executables
and info about their actual Executions
are stored as FlowExecutions
and TaskExecutions
. The service uses a FlowService
to start the flows.
UPD: There's a Quartz Job, "ExecutorJob" responsible for firing my flows/tasks. When it's triggered it uses the FlowService to start whatever it is supposed to. So I'm wondering if it's possible that the quartz thread is not creating new hibernate sessions each time it uses a service and that's the cause of the problem. I haven't changed the scope of the FlowService so it's a singleton, how GORM manages the session used by it?
UPD2: Tried using a persistenceContextInterceptor on ExecutorJob to make sure each use of the service uses a new session but it didn't solved the problem. Simplified code for ExecutorJob added.
I couldn't reproduce the issue locally, but it's happening frequently on production, more specifically when there's a lot of flows to initiate.
I've tried synchronizing the execute
methods for tasks and flows but it didn't worked, I'll try to use pessimistic lock now but my guess is that it won't solve the issue since checking the application logs it seems like there aren't two threads updating the same row. Following I've tried to show a simplified version of code mimicking how the project is structured.
// ------------------
// DOMAIN CLASSES
// ------------------
abstract class Executable {
static hasMany = [flowTasks: FlowTask]
static transients = ['executions']
List<Execution> getExecutions() {
this.id ? Execution.findAllByExecutable(this) : []
}
void addToExecutions(Execution execution) {
execution.executable = this
execution.save()
}
abstract List<Execution> execute(Map params)
}
class Flow extends Executable {
SortedSet<FlowTask> tasks
static hasMany = [tasks: FlowTask]
private static final Object lockExecute = new Object()
private static final Object lockExecuteTask = new Object()
List<FlowExecution> execute(Map params) {
synchronized (lockExecute) {
List<Map> multiParams = multiplyParams(params)
multiParams.collect { Map param ->
FlowExecution flowExecution = new FlowExecution()
addToExecutions(flowExecution)
flowExecution.save()
this.attach()
save()
executeTasks(firstTasks(param), flowExecution, param)
}
}
}
List<Map> multiplyParams(Map params) {
// creates a list of params for the executions that must be started
[params]
}
Set<FlowTask> firstTasks(Map params) {
// finds the first tasks to be executed for the flow
tasks.findAdll { true }
}
private FlowExecution executeTasks(Set<FlowTask> tasks, FlowExecution flowExecution, Map params) {
synchronized (lockExecuteTask) {
tasks.each { FlowTask flowTask ->
try {
List<Execution> executions = flowTask.execute(params)
executions.each { Execution execution ->
flowExecution.addToExecutions(execution)
}
flowExecution.attach()
} catch {
// log error executing task
throw e
}
}
this.attach()
try {
save(flush: true)
} catch (HibernateOptimisticLockingFailureException e) {
// log error saving flow
throw e
}
flowExecution
}
}
}
class Task extends Executable {
private static final Object lockExecute = new Object()
private static final Object lockGetExecution = new Object()
TaskExecution execute(TaskExecution execution) {
taskService.start(execution)
execution
}
List<TaskExecution> execute(Map params) {
synchronized (lockExecute) {
List<Map> multiExecParams = multiplyParams(params)
multiExecParams.collect { Map param ->
TaskExecution execution = getExecution(param)
execute(execution)
}
}
}
TaskExecution getExecution(Map params) {
synchronized (lockGetExecution) {
TaskExecution execution = new TaskExecution(executable: this)
execution.setParameters(params)
addToExecutions(execution)
execution.attach()
execution.flowExecution?.attach()
this.attach()
try {
save(flush: true)
} catch (HibernateOptimisticLockingFailureException e) {
// log error saving task
throw e
}
execution
}
}
List<Map> multiplyParams(Map params) {
// creates a list of params for the tasks that must be started
[params]
}
}
class FlowTask {
static belongsTo = [flow: Flow, executable: Executable]
List<Execution> execute(Map params) {
executable.execute(params)
}
}
abstract class Execution {
Map parameterData = [:]
static belongsTo = [executable: Executable, flowExecution: FlowExecution]
static transients = ['parameters', 'taskExecutions']
void setParameters(Map params) {
params.each { key, value ->
parameterData[key] = JsonParser.toJson(value)
}
}
}
class TaskExecution extends Execution {
}
class FlowExecution extends Execution {
List<Execution> executions
static transients = ['executions']
FlowExecution() {
executions = []
}
Set<TaskExecution> getTaskExecutions() {
executions?.collect { Execution execution ->
return execution.taskExecution
}?.flatten()?.toSet()
}
void addToExecutions(Execution execution){
executions.add(execution)
execution.flowExecution = this
execution.save()
}
def onLoad() {
try {
executions = this.id ? Execution.findAllByFlowExecution(this) : []
} catch (Exception e){
log.error(e)
[]
}
}
}
// -----------------
// SERVICE CLASSES
// -----------------
class FlowService {
Map start(long flowId, Map params) {
Flow flow = Flow.lock(flowId)
startFlow(flow, params)
}
private Map startFlow(Flow flow, Map params) {
List<RunningFlow> runningFlows = flow.execute(params)
[data: [success: true], status: HTTP_OK]
}
}
//--------------------------------------
// Quartz job
//--------------------------------------
class ExecutorJob implements InterruptableJob {
def grailsApplication = Holders.getGrailsApplication()
static triggers = {}
private Thread thread
void execute(JobExecutionContext context) throws JobExecutionException {
thread = Thread.currentThread()
synchronized (LockContainer.taskLock) {
Map params = context.mergedJobDataMap
def persistenceInterceptor = persistenceInterceptorInstance
try {
persistenceInterceptor.init()
Long executableId = params.executableId as Long
def service = (Executable.get(executableId) instanceof Flow) ? flowServiceInstance : taskServiceInstance
service.start(executableId, params)
} catch (Exception e) {
// log error
} finally {
persistenceInterceptor.flush()
persistenceInterceptor.destroy()
}
}
}
PersistenceContextInterceptor getPersistenceInterceptorInstance() {
grailsApplication.mainContext.getBean('persistenceInterceptor')
}
FluxoService getFlowServiceInstance() {
grailsApplication.mainContext.getBean('flowService')
}
TarefaService getTaskServiceInstance() {
grailsApplication.mainContext.getBean('taskService')
}
@Override
void interrupt() throws UnableToInterruptJobException {
thread?.interrupt()
}
}
Anyone knows something that might help?
Upvotes: 0
Views: 512
Reputation: 2354
Well, It's hard to understand what is going wrong. However, I guess this error gets thrown when you have an object in session which has already been saved or updated by some other transaction. Again, when hibernate tries to save this object it gives the Row was updated error by another transaction error.
I guess you can try refresh before you save your object and see how it goes.
http://grails.github.io/grails-doc/2.3.4/ref/Domain%20Classes/refresh.html
def b = Book.get(1)
…
b.refresh()
Upvotes: 0