Reputation: 1
protected void startConsumer() {
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationIdConfig); props.put(SCHEMAREG_URL, schemaregUrl); props.put(SSL_KEYSTORE_ALIAS, sslKeystoreAlias); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocolConfig); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, new File(TRUSTSTORE_FILE_NAME).getAbsolutePath()); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTrustStorePasswordConfig); props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, new File(KEYSTORE_FILE_NAME).getAbsolutePath()); props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslKeyStorePasswordConfig); props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslKeyPasswordConfig); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAxonDeSerializer.class);
//How to set mock data from junit for below code
ReceiverOptions options = ReceiverOptions.create(props).subscription(Collections.singleton(topicIdConfig)) .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions)) .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
// Set up to receive messages
KafkaReceiver receiver = KafkaReceiver.create(options);
receiver.doOnConsumer(c -> { c.assignment().forEach(a -> log.info("Subscribed to " + a.topic() + ":" + a.partition())); return c; });
Flux> recordFlux = receiver.receive(); fluxWorkflow(recordFlux); }
Upvotes: 0
Views: 255