AJ-ELS
AJ-ELS

Reputation: 11

Siddhi (wso2sp) - encountering DuplicateDefinitionException when trying to perform multiple insert operations on rdbs store

I have a siddhi app (wso2sp 4.4.0) that fails to deploy as soon as I add a second insert operation on one store. No problem with having one update or insert into followed by a second stream updating on the same store, but I need it to be an update or insert into as well. What am I doing wrong?

Context: I want to have multiple kafka topic sources, with each updating different fields on the same table depending on their payload (thus using 'set'), but because any of them could happen first, they all need to be update or insert into operations, not simply update. Having updates don't cause errors, but having the insert into does.

[2022-01-21 14:46:02,582] ERROR {org.wso2.carbon.stream.processor.core.internal.StreamProcessorDeployer} - org.wso2.siddhi.query.api.exception.DuplicateDefinitionException: Error on 'HumanTasks' @ Line: 37. 
Position: 35, near 'update or insert into humanTaskEvents
set humanTasksTable.taskCompletionDate = taskCompletionDate, humanTasksTable.taskCompletionDateEpochValue = taskCompletionDateEpochValue, humanTasksTable.taskCompletionAction = taskCompletionAction
on humanTaskEvents.taskId == taskId'. Error between @ Line: 35. Position: 0 and @ Line: 37. Position: 35. Different definition same as output 'define stream humanTaskEvents (taskId string, taskCompletionDate string, taskCompletionDateEpochValue string, taskCompletionAction string)' already exist as '@Store( type = "rdbms", datasource = "eventProcessRDS")define table humanTaskEvents (taskId string, taskName string, correlationId string, identifierName string, identifierValue string, groupName string, readOnlyHumanTask string, task string, timeoutTask string, taskCreationDate string, taskCompletionDate string, taskCreationDateEpochValue string, taskCompletionDateEpochValue string, taskCompletionAction string)' org.wso2.carbon.stream.processor.core.internal.exception.SiddhiAppDeploymentException: org.wso2.siddhi.query.api.exception.DuplicateDefinitionException: Error on 'HumanTasks' @ Line: 37. Position: 35, near 'update or insert into humanTaskEvents
set humanTasksTable.taskCompletionDate = taskCompletionDate, humanTasksTable.taskCompletionDateEpochValue = taskCompletionDateEpochValue, humanTasksTable.taskCompletionAction = taskCompletionAction
on humanTaskEvents.taskId == taskId'. Error between @ Line: 35. Position: 0 and @ Line: 37. Position: 35. Different definition same as output 'define stream humanTaskEvents (taskId string, taskCompletionDate string, taskCompletionDateEpochValue string, taskCompletionAction string)' already exist as '@Store( type = "rdbms", datasource = "eventProcessRDS")define table humanTaskEvents (taskId string, taskName string, correlationId string, identifierName string, identifierValue string, groupName string, readOnlyHumanTask string, task string, timeoutTask string, taskCreationDate string, taskCompletionDate string, taskCreationDateEpochValue string, taskCompletionDateEpochValue string, taskCompletionAction string)'
        at org.wso2.carbon.stream.processor.core.internal.StreamProcessorDeployer.deploySiddhiQLFile(StreamProcessorDeployer.java:106)
        at org.wso2.carbon.stream.processor.core.internal.StreamProcessorDeployer.deploy(StreamProcessorDeployer.java:330)
        at org.wso2.carbon.deployment.engine.internal.DeploymentEngine.lambda$deployArtifacts$0(DeploymentEngine.java:291)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at org.wso2.carbon.deployment.engine.internal.DeploymentEngine.deployArtifacts(DeploymentEngine.java:282)
        at org.wso2.carbon.deployment.engine.internal.RepositoryScanner.sweep(RepositoryScanner.java:112)
        at org.wso2.carbon.deployment.engine.internal.RepositoryScanner.scan(RepositoryScanner.java:68)
        at org.wso2.carbon.deployment.engine.internal.DeploymentEngine.start(DeploymentEngine.java:121)
        at org.wso2.carbon.deployment.engine.internal.DeploymentEngineListenerComponent.onAllRequiredCapabilitiesAvailable(DeploymentEngineListenerComponent.java:216)
        at org.wso2.carbon.kernel.internal.startupresolver.StartupComponentManager.lambda$notifySatisfiableComponents$7(StartupComponentManager.java:266)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at org.wso2.carbon.kernel.internal.startupresolver.StartupComponentManager.notifySatisfiableComponents(StartupComponentManager.java:252)
        at org.wso2.carbon.kernel.internal.startupresolver.StartupOrderResolver$1.run(StartupOrderResolver.java:204)
        at java.util.TimerThread.mainLoop(Timer.java:555)
        at java.util.TimerThread.run(Timer.java:505)
Caused by: org.wso2.siddhi.query.api.exception.DuplicateDefinitionException: Error on 'HumanTasks' @ Line: 37. Position: 35, near 'update or insert into humanTaskEvents
set humanTasksTable.taskCompletionDate = taskCompletionDate, humanTasksTable.taskCompletionDateEpochValue = taskCompletionDateEpochValue, humanTasksTable.taskCompletionAction = taskCompletionAction
on humanTaskEvents.taskId == taskId'. Error between @ Line: 35. Position: 0 and @ Line: 37. Position: 35. Different definition same as output 'define stream humanTaskEvents (taskId string, taskCompletionDate string, taskCompletionDateEpochValue string, taskCompletionAction string)' already exist as '@Store( type = "rdbms", datasource = "eventProcessRDS")define table humanTaskEvents (taskId string, taskName string, correlationId string, identifierName string, identifierValue string, groupName string, readOnlyHumanTask string, task string, timeoutTask string, taskCreationDate string, taskCompletionDate string, taskCreationDateEpochValue string, taskCompletionDateEpochValue string, taskCompletionAction string)'
        at org.wso2.siddhi.core.util.parser.QueryParser.parse(QueryParser.java:225)
        at org.wso2.siddhi.core.util.parser.SiddhiAppParser.parse(SiddhiAppParser.java:245)
        at org.wso2.siddhi.core.SiddhiManager.createSiddhiAppRuntime(SiddhiManager.java:65)
        at org.wso2.siddhi.core.SiddhiManager.createSiddhiAppRuntime(SiddhiManager.java:74)
        at org.wso2.carbon.stream.processor.core.internal.StreamProcessorService.deploySiddhiApp(StreamProcessorService.java:100)
        at org.wso2.carbon.stream.processor.core.internal.StreamProcessorDeployer.deploySiddhiQLFile(StreamProcessorDeployer.java:94)
        ... 14 more
Caused by: org.wso2.siddhi.query.api.exception.DuplicateDefinitionException: Error between @ Line: 35. Position: 0 and @ Line: 37. Position: 35. Different definition same as output 'define stream humanTaskEvents (taskId string, taskCompletionDate string, taskCompletionDateEpochValue string, taskCompletionAction string)' already exist as '@Store( type = "rdbms", datasource = "eventProcessRDS")define table humanTaskEvents (taskId string, taskName string, correlationId string, identifierName string, identifierValue string, groupName string, readOnlyHumanTask string, task string, timeoutTask string, taskCreationDate string, taskCompletionDate string, taskCreationDateEpochValue string, taskCompletionDateEpochValue string, taskCompletionAction string)'
        at org.wso2.siddhi.core.util.parser.helper.DefinitionParserHelper.validateOutputStream(DefinitionParserHelper.java:156)
        at org.wso2.siddhi.core.util.parser.OutputParser.constructOutputCallback(OutputParser.java:199)
        at org.wso2.siddhi.core.util.parser.QueryParser.parse(QueryParser.java:196)
        ... 19 more

and code:

@App:name("HumanTasks")
@App:description("testing")


@Store(type='rdbms', datasource='eventProcessRDS')
define table humanTaskEvents (taskId String, taskName String, correlationId String, identifierName String, identifierValue String, groupName String, readOnlyHumanTask String, task String, timeoutTask String, taskCreationDate String, taskCompletionDate String, taskCreationDateEpochValue String, taskCompletionDateEpochValue String, taskCompletionAction String);

@source(ref='sourceKafkaHumanTaskCreationEvent',
    @map(type = 'json', fail.on.missing.attribute= "false",
        @attributes(taskId = "taskId", taskName = "taskName", identifierName = "identifierName", correlationId = "correlationId", identifierValue = "identifierValue", groupName = "groupName", readOnlyHumanTask = "readOnlyUserTask", task = "task", timeoutTask = "timeoutTask", taskCreationDate = "taskCreationDate", taskCompletionDate = "taskCompletionDate", taskCreationDateEpochValue = "taskCreationDateEpochValue", taskCompletionDateEpochValue = "taskCompletionDateEpochValue", taskCompletionAction = "taskCompletionAction")))
define stream humanTaskCreation (taskId String, taskName String, correlationId String, identifierName String, identifierValue String, groupName String, readOnlyHumanTask String, task String, timeoutTask String, taskCreationDate String, taskCompletionDate String, taskCreationDateEpochValue String, taskCompletionDateEpochValue String, taskCompletionAction String);

@source(ref='sourceKafkaHumanTaskCompletionEvent',
    @map(type = 'json', fail.on.missing.attribute= "false",
        @attributes(taskId = "taskId", taskCompletionDate = "taskCompletionDate", taskCompletionDateEpochValue = "taskCompletionDateEpochValue", taskCompletionAction = "taskCompletionAction")))
define stream humanTaskCompletion (taskId String, taskCompletionDate String, taskCompletionDateEpochValue String, taskCompletionAction String);


------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


from humanTaskCreation
select *
update or insert into humanTaskEvents
set humanTasksTable.taskName = taskName, humanTasksTable.correlationId = correlationId, humanTasksTable.identifierName = identifierName, humanTasksTable.identifierValue = identifierValue, humanTasksTable.groupName = groupName, humanTasksTable.readOnlyHumanTask = readOnlyHumanTask, humanTasksTable.task = task, humanTasksTable.timeoutTask = timeoutTask, humanTasksTable.taskCreationDate = taskCreationDate, humanTasksTable.taskCreationDateEpochValue = taskCreationDateEpochValue
on humanTaskEvents.taskId == taskId;

from humanTaskCompletion
select taskId, taskCompletionDate, taskCompletionDateEpochValue, taskCompletionAction
update or insert into humanTaskEvents
set humanTasksTable.taskCompletionDate = taskCompletionDate, humanTasksTable.taskCompletionDateEpochValue = taskCompletionDateEpochValue, humanTasksTable.taskCompletionAction = taskCompletionAction
on humanTaskEvents.taskId == taskId;

Upvotes: 0

Views: 91

Answers (1)

Anusha Jayasundara
Anusha Jayasundara

Reputation: 168

If you are going to do an update or insert then you have to provide a value to all the columns in the humanTaskEvents table. Coz if the updates failed, it will try to add that event into the table and if you have not set all the columns, siddhi cannot insert a partial event to the table.

Upvotes: 0

Related Questions