Reputation: 51
I am developing an app with SpringBoot and Web3J, where i use a contract wrapper for interacting with Smart contract. Here is the autogenerated code of a method for listening to an event, called NewId:
public Observable<NewIdEventResponse> newIdEventObservable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
final Event event = new Event("NewId",
Arrays.<TypeReference<?>>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Bytes32>() {}, new TypeReference<Address>() {}));
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(event));
return web3j.ethLogObservable(filter).map(new Func1<Log, NewIdEventResponse>() {
@Override
public NewIdEventResponse call(Log log) {
EventValues eventValues = extractEventParameters(event, log);
NewIdEventResponse typedResponse = new NewIdEventResponse();
typedResponse.key = (Bytes32) eventValues.getNonIndexedValues().get(0);
typedResponse.contractId = (Address) eventValues.getNonIndexedValues().get(1);
return typedResponse;
}
});
}
I've created a subscription s to the observable, where i print the executing thread name AND increment a counter, initialized to 0. Then "mian thread" (spring boot thread) sleeps for 5 seconds after doing subscription, and then prints counter value and invoke s.unsubscribe . Here is code:
//Invoke transactional contract method...
this.counter = 0;
CountDownLatch latch = new CountDownLatch(1);
log.warn("Counter value before subscription: "+counter);
Subscription s = contractWrapper.newIdEventObservable(DefaultBlockParameterName.LATEST, DefaultBlockParameterName.LATEST)
.subscribe(evento ->{
log.warn("Event Received");
log.warn("Thread Name "+Thread.currentThread().getName());
latch.countDown();
this.testIncCounter();
},
Throwable::printStackTrace);
latch.await();
log.warn("Main Thread going to sleep for 5 seconds");
Thread.sleep(5000);
log.warn("Unsubscribing...");
s.unsubscribe();
log.warn("Counter value "+counter);
The testIncCounter method is synchronized:
private synchronized void testIncCounter(){
counter++;
}
The problem is that subscription code is executing twice by different threads, as it can be seen in logs ,and counter's end value is 2, and should be 1. Here is the output:
2017-11-07 20:56:17.345 WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl : Counter value before subscription: 0
2017-11-07 20:56:17.433 WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl : Event Received
2017-11-07 20:56:17.433 WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl : Thread Name http-nio-8090-exec-4
2017-11-07 20:56:17.449 WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl : Main Thread going to sleep for 5 seconds
2017-11-07 20:56:17.491 WARN 23533 --- [ool-17-thread-1] com.ckgt.service.impl.CecaServiceImpl : Event Received
2017-11-07 20:56:17.491 WARN 23533 --- [ool-17-thread-1] com.ckgt.service.impl.CecaServiceImpl : Thread Name pool-17-thread-1
2017-11-07 20:56:22.450 WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl : Unsubscribing...
2017-11-07 20:56:22.459 WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl : Counter value 2
Maybe is this a bug, or am i doing something wrong? Thank you very much in advance.
Upvotes: 4
Views: 1322