Reputation: 4154
I'm trying to create an integration test with PubSub emulator based on the example from this GitHub repo which looks like
@SpringBootTest
@Testcontainers
@ActiveProfiles("test")
public class PubSubIntegrationTests {
private static final String PROJECT_ID = "test-project";
@Container
private static final PubSubEmulatorContainer pubsubEmulator =
new PubSubEmulatorContainer(
DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:367.0.0-emulators"));
@DynamicPropertySource
static void emulatorProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.gcp.pubsub.emulator-host", pubsubEmulator::getEmulatorEndpoint);
}
@BeforeAll
static void setup() throws Exception {
ManagedChannel channel =
ManagedChannelBuilder.forTarget("dns:///" + pubsubEmulator.getEmulatorEndpoint())
.usePlaintext()
.build();
TransportChannelProvider channelProvider =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
TopicAdminClient topicAdminClient =
TopicAdminClient.create(
TopicAdminSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(channelProvider)
.build());
SubscriptionAdminClient subscriptionAdminClient =
SubscriptionAdminClient.create(
SubscriptionAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build());
PubSubAdmin admin =
new PubSubAdmin(() -> PROJECT_ID, topicAdminClient, subscriptionAdminClient);
admin.createTopic("test-topic");
admin.createSubscription("test-subscription", "test-topic");
admin.close();
channel.shutdown();
}
// By default, autoconfiguration will initialize application default credentials.
// For testing purposes, don't use any credentials. Bootstrap w/ NoCredentialsProvider.
@TestConfiguration
static class PubSubEmulatorConfiguration {
@Bean
CredentialsProvider googleCredentials() {
return NoCredentialsProvider.create();
}
}
@Autowired PubSubSender sender;
@Autowired PubSubSubscriberTemplate subscriberTemplate;
@Autowired PubSubPublisherTemplate publisherTemplate;
@Test
void testSend() throws ExecutionException, InterruptedException {
ListenableFuture<String> future = sender.send("hello!");
List<AcknowledgeablePubsubMessage> msgs =
await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));
assertEquals(1, msgs.size());
assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());
for (AcknowledgeablePubsubMessage msg : msgs) {
msg.ack();
}
}
@Test
void testWorker() throws ExecutionException, InterruptedException {
ListenableFuture<String> future = publisherTemplate.publish("test-topic", "hi!");
List<PubsubMessage> messages = Collections.synchronizedList(new LinkedList<>());
PubSubWorker worker =
new PubSubWorker(
"test-subscription",
subscriberTemplate,
(msg) -> {
messages.add(msg);
});
worker.start();
await().until(() -> messages, not(empty()));
assertEquals(1, messages.size());
assertEquals(future.get(), messages.get(0).getMessageId());
assertEquals("hi!", messages.get(0).getData().toStringUtf8());
worker.stop();
}
@AfterEach
void teardown() {
// Drain any messages that are still in the subscription so that they don't interfere with
// subsequent tests.
await().until(() -> subscriberTemplate.pullAndAck("test-subscription", 1000, true), hasSize(0));
}
}
all works fine for the above example but when I want to test my implementation as follows
@Autowired
private FunctionCatalog catalog;
@Test
void testSendB() throws ExecutionException, InterruptedException {
Consumer<PubSubMessage> function = catalog.lookup(MyFunction.class, FUNCTION_DEFINITION);
var pubSubMessage = new PubSubMessage();
pubSubMessage.setData(Base64.getEncoder().encodeToString(EMPTY_MESSAGE.getBytes()));
function.accept(pubSubMessage);
List<AcknowledgeablePubsubMessage> msgs =
await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));
assertEquals(1, msgs.size());
assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());
for (AcknowledgeablePubsubMessage msg : msgs) {
msg.ack();
}
}
it will throw error:
java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=test-topic).
where my service implementation uses Publisher
instead of PubSubPublisherTemplate
from the example:
private final Publisher publisher;
public void publishMessage(String message) {
var byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
var pubsubApiMessage = getPubsubApiMessage(byteStr);
try {
publish(pubsubApiMessage);
} catch (Exception e) {
log.error("Error during event publishing: " + e.getMessage(), e);
throw new RuntimeException(e);
}
}
private void publish(PubsubMessage pubsubApiMessage) throws Exception {
publisher.publish(pubsubApiMessage).get();
}
private PubsubMessage getPubsubApiMessage(ByteString byteStr) {
return PubsubMessage.newBuilder()
.setData(byteStr)
.build();
}
and works fine when deployed to GCP but not in this case of integration test using PubSub emulator.
Upvotes: 1
Views: 2270
Reputation: 4154
It came up that the PubSub emulator requires its own test publisher which can be created as a bean in configuration. Example:
@Configuration
public class PubSubConfig {
@Value("${gcp.pubsub.topic.name}")
private String topicName;
@Value("${gcp.project.id}")
private String projected;
@Value("${spring.cloud.gcp.pubsub.emulator-host}")
private String host;
private static final CredentialsProvider CREDENTIALS_PROVIDER = NoCredentialsProvider.create();
@Bean
public SubscriberStub testSubscriber(
FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
return GrpcSubscriberStub.create(SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(fixedTransportChannelProvider)
.setCredentialsProvider(CREDENTIALS_PROVIDER)
.build());
}
@Primary
@Bean
public Publisher testPublisher(FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
return Publisher.newBuilder(ProjectTopicName.of(projectId, topicName))
.setChannelProvider(fixedTransportChannelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build();
}
@Bean
public TopicAdminClient getTopicAdminClient(
FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
return TopicAdminClient.create(TopicAdminSettings.newBuilder()
.setTransportChannelProvider(fixedTransportChannelProvider)
.setCredentialsProvider(CREDENTIALS_PROVIDER)
.build());
}
@Primary
@Bean
public FixedTransportChannelProvider getChannelProvider() {
var channel = ManagedChannelBuilder.forTarget(host)
.usePlaintext()
.build();
return FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
}
@Bean
public SubscriptionAdminClient createSubscriptionAdmin(FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
return SubscriptionAdminClient.create(SubscriptionAdminSettings.newBuilder()
.setCredentialsProvider(
NoCredentialsProvider.create())
.setTransportChannelProvider(
fixedTransportChannelProvider)
.build());
}
}
Upvotes: 1