Maks Balamut
Maks Balamut

Reputation: 1

Method marked @ConnectMapping stops triggering after adding a PayloadInterceptor

While setting up the security RSocket configuration, I noticed that after adding my PayloadInterceptor, they stop making annotation responders

@Bean
    public PayloadSocketAcceptorInterceptor rsocketInterceptor(
            RSocketSecurity rsocket,
            JwtPayloadInterceptor jwtPayloadInterceptor
    ) {
        rsocket
                .authorizePayload(authorize ->
                        authorize
                                .anyRequest().permitAll()
                                .anyExchange().permitAll()
                )
                .addPayloadInterceptor(jwtPayloadInterceptor);
        return rsocket.build();
    }

addPayloadInterceptor() broke their operation My configuration:

@Configuration
@EnableRSocketSecurity
public class SecurityConfiguration {

    @Bean
    public PayloadSocketAcceptorInterceptor rsocketInterceptor(
            RSocketSecurity rsocket,
            JwtPayloadInterceptor jwtPayloadInterceptor
    ) {
        rsocket
                .authorizePayload(authorize ->
                        authorize
                                .anyRequest().permitAll()
                                .anyExchange().permitAll()
                )
                .addPayloadInterceptor(jwtPayloadInterceptor);
        return rsocket.build();
    }

    @Bean
    public ReactiveAuthenticationManager reactiveAuthenticationManager(ReactiveJwtDecoder jwtDecoder) {
        return new JwtReactiveAuthenticationManager(jwtDecoder);
    }

    @Bean
    public AuthenticationPayloadInterceptor authenticationPayloadInterceptor(ReactiveAuthenticationManager authenticationManager) {
        return new AuthenticationPayloadInterceptor(authenticationManager);
    }
}
@Configuration
public class RSocketServerConfiguration {

    @Bean
    public RSocketMessageHandler rsocketMessageHandler() {
        RSocketMessageHandler handler = new RSocketMessageHandler();
        handler.setRouteMatcher(new PathPatternRouteMatcher());
        handler.setHandlerPredicate(clazz -> AnnotationUtils
                .isCandidateClass(clazz, RSocketResponder.class)
        );
        handler.getArgumentResolverConfigurer()
                .addCustomResolver(new AuthenticationPrincipalArgumentResolver());
        return handler;
    }

    @Bean
    public RSocketRequester getRSocketRequester(
            @Value("${spring.rsocket.server.port}") Integer port,
            RSocketMessageHandler handler,
            SecuritySocketAcceptorInterceptor securitySocketAcceptorInterceptor
    ) {
        RSocketRequester.Builder builder = RSocketRequester.builder();
        return builder
                .rsocketConnector(rSocketConnector -> rSocketConnector
                        .reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2)))
                        .acceptor(securitySocketAcceptorInterceptor.apply(handler.responder()))
                )
                .tcp("localhost", port);
    }
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RSocketResponder {
}
@Component
@Order(5)
public class JwtPayloadInterceptor implements PayloadInterceptor {

    private final ReactiveAuthenticationManager authenticationManager;
    private final PayloadExchangeAuthenticationConverter authenticationConverter =
            new BearerPayloadExchangeConverter();

    public JwtPayloadInterceptor(ReactiveAuthenticationManager authenticationManager) {
        this.authenticationManager = authenticationManager;
    }

    @Override
    public Mono<Void> intercept(PayloadExchange exchange, PayloadInterceptorChain chain) {
        return this.authenticationConverter.convert(exchange)
                .switchIfEmpty(chain.next(exchange).then(Mono.empty()))
                .flatMap(this.authenticationManager::authenticate)
                .flatMap((a) -> onAuthenticationSuccess(chain.next(exchange), a));
    }

    private Mono<Void> onAuthenticationSuccess(Mono<Void> payload, Authentication authentication) {
        return payload.contextWrite(ReactiveSecurityContextHolder.withAuthentication(authentication));
    }
}

And also is my test:

@SpringBootTest(
        properties = {
                "spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.oauth2.resource.reactive.ReactiveOAuth2ResourceServerAutoConfiguration"
        }
)
class ConnectionControllerTest {

    private static RSocketRequester requester;
    @Autowired
    private MessageNotificationSender sender;

    @MockBean
    private ReactiveJwtDecoder jwtDecoder;

    @BeforeAll
    public static void setupOnce(@Autowired RSocketRequester.Builder builder, @Value("${spring.rsocket.server.port}") Integer port) {
        requester = builder
                .setupMetadata("token", BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE)
                .tcp("localhost", port);
    }

    @Test
    void shouldConnect() {
        when(jwtDecoder.decode(anyString()))
                .thenAnswer(invocation -> Mono.just(Jwt.withTokenValue("token").subject("1").build()));
        StepVerifier
                .create(requester.rsocketClient().source())
                .consumeNextWith(client -> {
                    assertThat(client.availability()).isEqualTo(1.0);
                })
                .verifyComplete();
    }
}

I expect my interceptor to authenticate users and be able to access the user via an argument with @AuthenticationPrincipal and also for my annotation responders to work. Now, the intercept() method of the JwtPayloadInterceptor works but the onAuthenticationSuccess() method never fires. At the same time, the responder marked @ConnectMessage does not work

Upvotes: 0

Views: 29

Answers (0)

Related Questions