Sat
Sat

Reputation: 4178

Why is Observable functionality getting executed twice for a single call?

Complete structure of the program

Annotation:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface UserAnnotation {
}

Then created a Interceptor:

public class UserInterceptor implements MethodInterceptor {

    private static final Logger logger = LoggerFactory.getLogger(UserInterceptor.class);

    @Inject
    UserService userService; // this is not working

    public Object invoke(MethodInvocation invocation) throws Throwable {
        logger.info("UserInterceptor : Interceptor Invoked");
        Object result = invocation.proceed();
        Observable<List<User>> observable = (Observable<List<Sample>>) result;
        observable.flatMap(Observable::from).subscribe(object -> {
            User user = (User)object
            SampleSender sender = new SampleSender();
            sender.setBoolean(user.isBoolean());
            logger.info("Pushing Data into Sender");
            userService.insert(String.join("_", "key", "value"), sender); 
        }
        return result;
    }
}

Then I created a GuiceModule as below:-

public class UserModule extends AbstractModule {
    @Override
    protected void configure() {
        SampleInterceptor interceptor = new SampleInterceptor()
        requestInjection(interceptor);
        bindInterceptor(Matchers.any(), Matchers.annotatedWith(SampleAnnotation.class), interceptor);
}

}

Class in which I am using the above annotation is

// This class also have so many method and this was already declared and using in another services, I created a sample class here
class UserClassForInterceptor {

      @Inject
      AnotherClass anotherClass;
      // this userMethod() is not a new method, its already created, 
      // now I am adding annotation to it, because after finishing this functionality, 
      // I want something should be done, so created annotation and added here
      @UserAnnotation
      public Observable<List<Sample>> userMethod() {
            logger.info("This is printing only once");
            return anotherClass.getUser().flatMap(user ->{
                logger.info("This is also printing twice");
                // this logger printed twise means, this code snippet is getting executed twise
            });
      }
}

public class AnotherClass{
          public Observable<User> getUser(){
           Observable<Sample> observableSample = methodReturnsObservableSample();
           logger.info("Getting this logger only once");
           return observableSample.map(response-> {
               logger.info("This logger is printing twice");
               //here have code to return observable of User
           });
      }
}

If I remove annotation loggers inside the observable are printing only one time but when I use annotation those loggers are getting printed twise. Why it is behaving like this I dont know.

I have a RestModule using which I am binding UserClassForInterceptor as follows

public final class RestModule extends JerseyServletModule {
    // other classes binding
    bind(UserClassForInterceptor.class).in(Scopes.SINGLETON);
    // other classes binding
    install(new SampleModule());
}

Now I have a bootsrap class in which I am binding RestModule

public class Bootstrap extends ServerBootstrap {
   binder.install(new RestModule());
}

Usage:-

@Path("service/sample")
public class SampleRS {
    @Inject
    UserClassForInterceptor userClassForInterceptor;

    public void someMethod() {
        userClassForInterceptor.sampleMethod();
    }
}

Upvotes: 0

Views: 611

Answers (2)

Sat
Sat

Reputation: 4178

This code also helped me

public Object invoke(MethodInvocation invocation) throws Throwable {
    Object result = null;
    try{
        logger.debug("Interceptor Invoked");
        result = invocation.proceed();
        Observable<List<User>> observable = (Observable<List<User>>)result;

        return observable
                .doOnNext(this::updateUser);
    }
    catch(Exception ex){
        logger.error("Error: ",ex);
    }
    return result;
}

private void updateUser(List<User> users) {
    if(CollectionUtils.isNotEmpty(users)) {
        for(User user: users) {
            SampleSender sender = new SampleSender();
            sender.setBoolean(user.isBoolean());
            logger.info("Pushing Data into Sender");
            userService.insert(String.join("_", "key", "value"), sender); 
        }
    }
}

Upvotes: 0

Bob Dalgleish
Bob Dalgleish

Reputation: 8227

You created an annotation, @UserAnnotation, and an interceptor class to go with the annotation. You attach the annotation to a method, userMethod().

The first thing your interceptor routine does is invoke userMethod() to get the observable that it returns and then the interceptor subscribes to the returned observable, causing the first log messages to appear. Eventually, the interceptor returns the observable to the original caller. When something else subscribes to the returned observable, the observer chain is activated a second time, hence the log messages appear twice.

RxJava Has Side Effects

While RxJava is an implementation of the "functional reactive programming" concept, the observer chains that you construct (in a functional manner) only work when they are subscribed to, and those subscriptions have side effects. Logging output is one side effect, and probably the most benign; changes to variables or invocations of methods that have side effects have a wider impact.

When an observer chain is constructed (properly), it acts as a potential computation until there is a subscriber. If you need to have more than one subscriber, as you might for your problem domain, then you have to decide whether the observer chain needs to be activated for each subscription, the normal case, or only once for all overlapping subscriptions.

If you want all overlapping subscriptions to share the same observable, then you can use the share() operator. There are a number of related operators that affect the lifetime of observables and subscriptions. Here is an overview: How to use RxJava share() operator?

Aspect Oriented Programming: Interceptors And Guice

Your code is using Guice to provide a capability called "aspect oriented programming". This allows you to introduce code into your program to address cross-cutting concerns, or to enhance its functionality by setting up controlled gateways. Using Guice, or similar AOP approaches, requires discipline.

In your case, you used the interception process to cause unexplained (until now) side effects by subscribing to an observer chain that has non-trivial side effects. Imagine that the method you intercepted set up a one-time connection and that your interceptor used up that connection doing its work, leaving the original caller unable to use the connection.

The discipline you need is to understand the rules that the interceptor must follow. Think of rules such as "First, do no harm".

Doing Things The FRP Way

If you need to add an extra step when handling user information, then you should construct a new observable in your interceptor that does that, but only when the original caller subscribed to the observable:

    Object result = invocation.proceed();
    Observable<List<User>> observable = (Observable<List<Sample>>) result;
    Observable<List<User>> newObservable = observable
      .doOnNext( sampleList ->
         Observable.fromIterable( sampleList )
           .subscribe(object -> {
             User user = (User)object
             SampleSender sender = new SampleSender();
             sender.setBoolean(user.isBoolean());
             logger.info("Pushing Data into Sender");
             userService.insert(String.join("_", "key", "value"), sender); 
           }));
    return newObservable;

By returning a modified observer chain, you don't introduce side effects from the original observer chain, and ensure that the side effects you introduce in your own code will only be triggered when the original observer chain is subscribed to.

Upvotes: 1

Related Questions