Reputation: 6528
I have a Spring-boot Unit Test that is testing Switch Back capabilities of my application when the primary Kafka Cluster comes online.
The application successfully switches to secondary when the primary goes offline. Now we're adding the ability to switch back to primary on a timer instead of failure.
My Test Method Looks like so:
//Rochelle = Primary BootStrapServers
//Hudson = Secondary BootStrapServers
@Test
public void send_switchback() throws Exception
{
//Get ABSwitchCluster to check failover details
KafkaSwitchCluster ktSwitch = (KafkaSwitchCluster)
((BootStrapExposerProducerFactory)
kafkaTemplate.getProducerFactory()).getBootStrapSupplier();
assertThat(ktSwitch, notNullValue());
assertThat(ktSwitch.get(), is(Rochelle));
assertThat(ktSwitch.isPrimary(), is(true));
assertThat(getBootStrapServersList(), is(Rochelle));
log.info("Shutdown Broker to test Failover.");
//Shutdown Primary Servers to simulate disconnection
shutdownBroker_primary();
//Allow for fail over to happen
if ( ktSwitch.isPrimary() )
{
try
{
synchronized (lock)
{ //pause to give Idle Event a chance to fire
for (int i = 0; i <= timeOut && ktSwitch.isPrimary(); ++i)
//while ( ktSwitch.isPrimary() )
{ //poll for cluster switch
lock.wait(Duration.ofSeconds(15).toMillis());
}
}
}
catch (InterruptedException IGNORE)
{ fail("Unable to wait for cluster switch. " + IGNORE.getMessage()); }
}
//Confirm Failover has happened
assertThat(ktSwitch.get(), is(Hudson));
assertThat(ktSwitch.isPrimary(), is(false));
assertThat(getBootStrapServersList(), is(Hudson));
assertThat(kafkaSwitchCluster.get(), is(Hudson));
assertThat(kafkaSwitchCluster.isPrimary(), is(false));
//Send a message on backup server
String message = "Test Failover";
send(message);
String msg = records.poll(10, TimeUnit.SECONDS);
assertThat(msg, notNullValue());
assertThat(msg, is(message));
startup_primary();
//embeddedKafkaRule.getEmbeddedKafka();
assertThat(embeddedKafka.getBrokersAsString(), is(Rochelle));
String brokers = embeddedKafka.getBrokersAsString();
if ( !kafkaProducerErrorHandler.areBrokersUp(brokers) )
{
synchronized (lock)
{
for ( int i=0;
i <= 15 && !kafkaProducerErrorHandler.areBrokersUp(brokers)
&& registry.isRunning();
++i )
{ lock.wait(Duration.ofSeconds(1).toMillis()); }
}
}
//TODO: test Scheduled Fire
kafkaProducerErrorHandler.primarySwitch();
if ( !kafkaSwitchCluster.isPrimary() )
{
try
{
synchronized (lock)
{ //pause to give Idle Event a chance to fire
for (int i = 0; i <= timeOut && !kafkaSwitchCluster.isPrimary(); ++i)
//while ( !ktSwitch.isPrimary() )
{ //poll for cluster switch
lock.wait(Duration.ofSeconds(15).toMillis());
}
}
}
catch (InterruptedException IGNORE)
{ fail("Unable to wait for cluster switch. " + IGNORE.getMessage()); }
}
assertThat(brokers, anyOf(is(Rochelle), is(Hudson))); //port didn't change
assertThat(brokers, is(Rochelle)); //is primary
assertThat(kafkaSwitchCluster.isPrimary(), is(true));
//assertThat(ktSwitch.isPrimary(), is(true));
assertThat(ktSwitch.get(), is(brokers));
assertThat(kafkaProducerErrorHandler.areBrokersUp(brokers), is(true));
assertThat(kafkaProducerErrorHandler.areBrokersUp(Rochelle), is(true));
assertThat(ktSwitch.isPrimary(), is(true));
//assertThat(ktSwitch.get(), not(anyOf(is(Hudson), is(Rochelle))));
assertThat(ktSwitch.get(), is(embeddedKafka.getBrokersAsString()));
//Send a message on backup server
message = "Test newPrimary";
send(message);
msg = records.poll(10, TimeUnit.SECONDS);
assertThat(msg, notNullValue());
assertThat(msg, is(message));
log.info("Test is finished");
}
I'm using this method to shutdown my Primary Embedded Kafka
public void shutdownBroker_primary()
{
for(KafkaServer ks : embeddedKafka.getKafkaServers())
{ ks.shutdown(); }
for(KafkaServer ks : embeddedKafka.getKafkaServers())
{ ks.awaitShutdown(); }
}
I'm using this to restart Kafka:
public void startup_primary()
{
//registry.stop();
//kafkaSwitchCluster.Rochelle = embeddedKafka.getBrokersAsString();
for(KafkaServer ks : embeddedKafka.getKafkaServers()) { ks.startup(); }
registry.start();
}
primarySwitch()
is a Scheduled event to switch the cluster back to primary. It is Directly called in test. It's a wrapper around the same code that switches the in-use cluster when Kafka goes down.
How do I get the Primary Embedded Kafka Cluster to successfully start after I shut it down so I can prove that the application can successfully move back to the primary cluster once It's available again?
UPDATE:
I have created Code Example on Github with what I have so far: https://github.com/raystorm/Kafka-Example .
UPDATE: 2 the Linked Repository Above has been updated based on the accepted answer below, and now all tests pass.
Upvotes: 1
Views: 4322
Reputation: 174504
It wasn't really designed for this use case, but the following works, as long as you don't need to retain data between the broker instances...
@SpringBootTest
@EmbeddedKafka(topics = "so64145670", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So64145670ApplicationTests {
@Autowired
private EmbeddedKafkaBroker broker;
@Test
void restartBroker(@Autowired KafkaTemplate<String, String> template) throws Exception {
SendResult<String, String> sendResult = template.send("so64145670", "foo").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
this.broker.destroy();
// restart
this.broker.afterPropertiesSet();
sendResult = template.send("so64145670", "bar").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
}
}
EDIT
Here's one with two brokers...
@SpringBootTest(classes = { So64145670Application.class, So64145670ApplicationTests.Config.class })
@EmbeddedKafka(topics = "so64145670", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So64145670ApplicationTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private EmbeddedKafkaBroker secondBroker;
@Test
void restartBroker(@Autowired KafkaTemplate<String, String> template,
@Autowired ProducerFactory<String, String> pf) throws Exception {
SendResult<String, String> sendResult = template.send("so64145670", "foo").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
KafkaTemplate<String, String> secondTemplate = new KafkaTemplate<>(pf,
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.secondBroker.getBrokersAsString()));
sendResult = secondTemplate.send("so64145670-1", "foo").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
this.embeddedKafka.destroy();
this.secondBroker.destroy();
// restart
this.embeddedKafka.afterPropertiesSet();
this.secondBroker.afterPropertiesSet();
sendResult = template.send("so64145670", "bar").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
sendResult = secondTemplate.send("so64145670-1", "bar").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
}
@Configuration
public static class Config {
@Bean
EmbeddedKafkaBroker secondBroker() {
return new EmbeddedKafkaBroker(1, true, "so64145670-1")
.brokerListProperty("spring.kafka.second.server");
}
}
}
+++so64145670-1@0
+++so64145670-1-0@0
+++so64145670-1@0
+++so64145670-1-0@0
Upvotes: 1