Paulo Cezar
Paulo Cezar

Reputation: 1

Can't figure out cause of StaleObjectStateException

I'm having a hard time trying to figure out the reason I keep seeing:

`HibernateOptimisticLockingFailureException: FlowExecution: optimistic locking failed; nested exception is org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)`

I have a service that uses Quartz Scheduler to fire jobs, in my context these jobs are called Flows and each flow might be composed from several Tasks, flows and tasks are Executables and info about their actual Executions are stored as FlowExecutions and TaskExecutions. The service uses a FlowService to start the flows.

UPD: There's a Quartz Job, "ExecutorJob" responsible for firing my flows/tasks. When it's triggered it uses the FlowService to start whatever it is supposed to. So I'm wondering if it's possible that the quartz thread is not creating new hibernate sessions each time it uses a service and that's the cause of the problem. I haven't changed the scope of the FlowService so it's a singleton, how GORM manages the session used by it?

UPD2: Tried using a persistenceContextInterceptor on ExecutorJob to make sure each use of the service uses a new session but it didn't solved the problem. Simplified code for ExecutorJob added.

I couldn't reproduce the issue locally, but it's happening frequently on production, more specifically when there's a lot of flows to initiate. I've tried synchronizing the execute methods for tasks and flows but it didn't worked, I'll try to use pessimistic lock now but my guess is that it won't solve the issue since checking the application logs it seems like there aren't two threads updating the same row. Following I've tried to show a simplified version of code mimicking how the project is structured.

// ------------------
// DOMAIN CLASSES
// ------------------
abstract class Executable {
    static hasMany = [flowTasks: FlowTask]
    static transients = ['executions']

    List<Execution> getExecutions() {
        this.id ? Execution.findAllByExecutable(this) : []
    }

    void addToExecutions(Execution execution) {
        execution.executable = this
        execution.save()
    }

    abstract List<Execution> execute(Map params)
}

class Flow extends Executable {
    SortedSet<FlowTask> tasks
    static hasMany = [tasks: FlowTask]

    private static final Object lockExecute = new Object()
    private static final Object lockExecuteTask = new Object()

    List<FlowExecution> execute(Map params) {
        synchronized (lockExecute) {
            List<Map> multiParams = multiplyParams(params)
            multiParams.collect { Map param ->
                FlowExecution flowExecution = new FlowExecution()
                addToExecutions(flowExecution)
                flowExecution.save()
                this.attach()
                save()
                executeTasks(firstTasks(param), flowExecution, param)
            }
        }
    }

    List<Map> multiplyParams(Map params) {
        // creates a list of params for the executions that must be started
        [params]
    }

    Set<FlowTask> firstTasks(Map params) {
        // finds the first tasks to be executed for the flow
        tasks.findAdll { true }
    }

    private FlowExecution executeTasks(Set<FlowTask> tasks, FlowExecution flowExecution, Map params) {
        synchronized (lockExecuteTask) {
            tasks.each { FlowTask flowTask ->
                try {
                    List<Execution> executions = flowTask.execute(params)
                    executions.each { Execution execution ->
                        flowExecution.addToExecutions(execution)
                    }
                    flowExecution.attach()
                } catch {
                    // log error executing task
                    throw e
                }            
            }

            this.attach()
            try {
                save(flush: true)
            } catch (HibernateOptimisticLockingFailureException e) {
                // log error saving flow
                throw e
            }

            flowExecution
        }
    }

}

class Task extends Executable {
    private static final Object lockExecute = new Object()
    private static final Object lockGetExecution = new Object()

    TaskExecution execute(TaskExecution execution) {
        taskService.start(execution)
        execution
    }

    List<TaskExecution> execute(Map params) {
        synchronized (lockExecute) {
            List<Map> multiExecParams = multiplyParams(params)
            multiExecParams.collect { Map param ->
                TaskExecution execution = getExecution(param)
                execute(execution)
            }
        }
    }

    TaskExecution getExecution(Map params) {
        synchronized (lockGetExecution) {
            TaskExecution execution = new TaskExecution(executable: this)
            execution.setParameters(params)
            addToExecutions(execution)

            execution.attach()
            execution.flowExecution?.attach()
            this.attach()
            try {
                save(flush: true)
            } catch (HibernateOptimisticLockingFailureException e) {
                // log error saving task
                throw e
            }

            execution
        }
    }

    List<Map> multiplyParams(Map params) {
        // creates a list of params for the tasks that must be started
        [params]
    }

}

class FlowTask {
    static belongsTo = [flow: Flow, executable: Executable]

    List<Execution> execute(Map params) {
        executable.execute(params)
    }
}

abstract class Execution {
    Map parameterData = [:]
    static belongsTo = [executable: Executable, flowExecution: FlowExecution]
    static transients = ['parameters', 'taskExecutions']   
    void setParameters(Map params) {
        params.each { key, value ->
            parameterData[key] = JsonParser.toJson(value)
        }
    }
}

class TaskExecution extends Execution {
}

class FlowExecution extends Execution {
    List<Execution> executions
    static transients = ['executions']

    FlowExecution() {
        executions = []
    }

    Set<TaskExecution> getTaskExecutions() {
        executions?.collect { Execution execution ->
            return execution.taskExecution
        }?.flatten()?.toSet()
    }

    void addToExecutions(Execution execution){
        executions.add(execution)
        execution.flowExecution = this
        execution.save()
    }

    def onLoad() {
        try {
            executions = this.id ? Execution.findAllByFlowExecution(this) : []
        } catch (Exception e){
            log.error(e)
            []
        }
    }
}

// -----------------
// SERVICE CLASSES
// -----------------
class FlowService {

    Map start(long flowId, Map params) {
        Flow flow = Flow.lock(flowId)

        startFlow(flow, params)
    }

    private Map startFlow(Flow flow, Map params) {
        List<RunningFlow> runningFlows = flow.execute(params) 

        [data: [success: true], status: HTTP_OK]
    }
}

//--------------------------------------
// Quartz job
//--------------------------------------
class ExecutorJob implements InterruptableJob {

    def grailsApplication = Holders.getGrailsApplication()

    static triggers = {}

    private Thread thread

    void execute(JobExecutionContext context) throws JobExecutionException {
        thread = Thread.currentThread()
        synchronized (LockContainer.taskLock) {
            Map params = context.mergedJobDataMap
            def persistenceInterceptor = persistenceInterceptorInstance

            try {
                persistenceInterceptor.init()

                Long executableId = params.executableId as Long

                def service = (Executable.get(executableId) instanceof Flow) ? flowServiceInstance : taskServiceInstance
                service.start(executableId, params)
            } catch (Exception e) {
                // log error
            } finally {
                persistenceInterceptor.flush()
                persistenceInterceptor.destroy()
            }
        }
    }

    PersistenceContextInterceptor getPersistenceInterceptorInstance() {
        grailsApplication.mainContext.getBean('persistenceInterceptor')
    }

    FluxoService getFlowServiceInstance() {
        grailsApplication.mainContext.getBean('flowService')
    }

    TarefaService getTaskServiceInstance() {
        grailsApplication.mainContext.getBean('taskService')
    }

    @Override
    void interrupt() throws UnableToInterruptJobException {
        thread?.interrupt()
    }    
}

Anyone knows something that might help?

Upvotes: 0

Views: 512

Answers (1)

Lalit Agarwal
Lalit Agarwal

Reputation: 2354

Well, It's hard to understand what is going wrong. However, I guess this error gets thrown when you have an object in session which has already been saved or updated by some other transaction. Again, when hibernate tries to save this object it gives the Row was updated error by another transaction error.

I guess you can try refresh before you save your object and see how it goes.

http://grails.github.io/grails-doc/2.3.4/ref/Domain%20Classes/refresh.html

def b = Book.get(1)
…
b.refresh()

Upvotes: 0

Related Questions