Reputation: 764
I am struggling to read message off the destination channel the cloud streaming handler is being written to.
I have the gcp emulator for pub/sub running in test containers. I am able to successfully send messages to the input queue/topic(sorry I come from a JMS background, not sure which the correct term is for gcp pub/sub)
spring streaming config:
spring.cloud.gcp.pubsub.emulator-host=localhost:8085
spring.cloud.gcp.pubsub.project-id=project-treeline
spring.cloud.gcp.project-id=project-treeline
spring.cloud.stream.bindings.input.destination=split
spring.cloud.stream.bindings.output.destination=amount
spring.cloud.stream.gcp.pubsub.bindings.amount.consumer.auto-create-resources=true
Test case:
@ExtendWith(SpringExtension.class)
@SpringBootTest
@Testcontainers
@ActiveProfiles({"test", "gcp"})
@Slf4j
public class SpringProcessorTest {
private static final int SUCCESS_EXPECTED_SPLIT_MSG_COUNT = 3;
@Autowired
private PubSubTemplate template;
@Autowired
private PlaylistRepository playlistRepo;
@Autowired
private PlaylistEntryRepository playlistEntryRepo;
@Value("${spring.cloud.stream.bindings.input.destination}")
private String inputTopic;
@Value("${spring.cloud.stream.bindings.output.destination}")
private String outputTopic;
@Test
public void successfullyProcessPlaylist() throws Exception {
final Long playlistId = playlistRepo.findByFileUri(TestDataConst.PLAYLIST_URI).getId();
playlistEntryRepo.findByPlaylist(playlistId).forEach(c -> template.publish(inputTopic, c.getId().toString()));
Thread.sleep(4000);
final List<AcknowledgeablePubsubMessage> msgs = template.pull("output", 3, Boolean.TRUE);
assertNotNull(msgs);
assertEquals(SUCCESS_EXPECTED_SPLIT_MSG_COUNT, msgs.size());
}
}
Instead of using template.pull
I've tried using template.subscribe
but I still have the same issue. Console logs:
com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Subscription does not exist
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Subscription does not exist
Logs do indicate that the events from the test code are being sent and message are being processed by the Processor:
14:15:24.274 [gcp-pubsub-subscriber3] INFO treeline.service.splits.SplitService - Featured performer id: [7], name: [Robb Flynn], receiving split: [25,000000]
14:15:24.274 [gcp-pubsub-subscriber1] INFO treeline.service.splits.SplitService - Non-featured performer id: [7], name: [Robb Flynn], receiving split: [7,000000]
14:15:24.274 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Non-featured performer id: [7], name: [Robb Flynn], receiving split: [5,833333]
14:15:24.279 [gcp-pubsub-subscriber3] INFO treeline.service.splits.SplitService - Featured performer id: [12], name: [Logan Mader], receiving split: [25,000000]
14:15:24.279 [gcp-pubsub-subscriber1] INFO treeline.service.splits.SplitService - Non-featured performer id: [12], name: [Logan Mader], receiving split: [7,000000]
14:15:24.279 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Non-featured performer id: [12], name: [Logan Mader], receiving split: [5,833333]
14:15:24.283 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Non-featured performer id: [2], name: [Tom Araya], receiving split: [5,833333]
14:15:24.283 [gcp-pubsub-subscriber3] INFO treeline.service.splits.SplitService - Featured performer id: [8], name: [Adam Deuce], receiving split: [25,000000]
14:15:24.283 [gcp-pubsub-subscriber1] INFO treeline.service.splits.SplitService - Non-featured performer id: [2], name: [Tom Araya], receiving split: [7,000000]
14:15:24.286 [gcp-pubsub-subscriber3] INFO treeline.service.splits.SplitService - Featured performer id: [10], name: [Chris Kontos], receiving split: [25,000000]
14:15:24.286 [gcp-pubsub-subscriber1] INFO treeline.service.splits.SplitService - Featured performer id: [13], name: [Nobody 3], receiving split: [26,333333]
14:15:24.286 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Non-featured performer id: [4], name: [Kerry King], receiving split: [5,833333]
14:15:24.290 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Non-featured performer id: [5], name: [Gary Holt], receiving split: [5,833333]
14:15:24.290 [gcp-pubsub-subscriber1] INFO treeline.service.splits.SplitService - Featured performer id: [14], name: [Nobody 1], receiving split: [26,333333]
14:15:24.293 [gcp-pubsub-subscriber1] INFO treeline.service.splits.SplitService - Featured performer id: [15], name: [Nobody 2], receiving split: [26,333333]
14:15:24.293 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Non-featured performer id: [3], name: [Jeff Hanneman], receiving split: [5,833333]
14:15:24.297 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Featured performer id: [13], name: [Nobody 3], receiving split: [21,666667]
14:15:24.302 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Featured performer id: [14], name: [Nobody 1], receiving split: [21,666667]
14:15:24.308 [gcp-pubsub-subscriber2] INFO treeline.service.splits.SplitService - Featured performer id: [15], name: [Nobody 2], receiving split: [21,666667]
14:15:24.319 [gcp-pubsub-subscriber3] DEBUG o.s.c.s.m.DirectWithAttributesChannel - preSend on channel 'output', message: GenericMessage [payload=[Split(splitAmount=null, splitPercentage=25, playlistEntry=3, person=7), Split(splitAmount=null, splitPercentage=25, playlistEntry=3, person=12), Split(splitAmount=null, splitPercentage=25, playlistEntry=3, person=8), Split(splitAmount=null, splitPercentage=25, playlistEntry=3, person=10)], headers={id=dfcb46c4-7cbc-05b1-0030-219dd378141f, timestamp=1566389724287}]
14:15:24.320 [gcp-pubsub-subscriber1] DEBUG o.s.c.s.m.DirectWithAttributesChannel - preSend on channel 'output', message: GenericMessage [payload=[Split(splitAmount=null, splitPercentage=7, playlistEntry=1, person=7), Split(splitAmount=null, splitPercentage=7, playlistEntry=1, person=12), Split(splitAmount=null, splitPercentage=7, playlistEntry=1, person=2), Split(splitAmount=null, splitPercentage=26.333333333333332149095440399833023548126220703125, playlistEntry=1, person=13), Split(splitAmount=null, splitPercentage=26.333333333333332149095440399833023548126220703125, playlistEntry=1, person=14), Split(splitAmount=null, splitPercentage=26.333333333333332149095440399833023548126220703125, playlistEntry=1, person=15)], headers={id=387b1f11-14b9-34c2-0a60-054eda20dfa3, timestamp=1566389724293}]
14:15:24.320 [gcp-pubsub-subscriber2] DEBUG o.s.c.s.m.DirectWithAttributesChannel - preSend on channel 'output', message: GenericMessage [payload=[Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=7), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=12), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=2), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=4), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=5), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=3), Split(splitAmount=null, splitPercentage=21.666666666666667850904559600166976451873779296875, playlistEntry=2, person=13), Split(splitAmount=null, splitPercentage=21.666666666666667850904559600166976451873779296875, playlistEntry=2, person=14), Split(splitAmount=null, splitPercentage=21.666666666666667850904559600166976451873779296875, playlistEntry=2, person=15)], headers={id=539cd992-168f-5731-ea9d-c9212f0656ee, timestamp=1566389724309}]
14:15:24.390 [gcp-pubsub-subscriber1] DEBUG o.s.c.s.b.AbstractMessageChannelBinder$SendingHandler - org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@1f1ef8b2 received message: GenericMessage [payload=byte[575], headers={contentType=application/json, id=c34eb207-1a4c-6bc9-f91e-517c983c73b8, timestamp=1566389724390}]
14:15:24.390 [gcp-pubsub-subscriber2] DEBUG o.s.c.s.b.AbstractMessageChannelBinder$SendingHandler - org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@1f1ef8b2 received message: GenericMessage [payload=byte[1091], headers={contentType=application/json, id=6d8d417d-3f45-7b83-d243-2cee0162cfb1, timestamp=1566389724390}]
14:15:24.391 [gcp-pubsub-subscriber2] DEBUG o.s.c.g.p.i.o.PubSubMessageHandler - org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler@1c84a8f5 received message: GenericMessage [payload=byte[1091], headers={contentType=application/json, id=6d696eab-b2d1-4d6c-bb45-9385c4567cf3, timestamp=1566389724391}]
14:15:24.391 [gcp-pubsub-subscriber1] DEBUG o.s.c.g.p.i.o.PubSubMessageHandler - org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler@1c84a8f5 received message: GenericMessage [payload=byte[575], headers={contentType=application/json, id=3e02b7af-cca7-9172-398f-b1bbc4245a00, timestamp=1566389724391}]
14:15:24.394 [gcp-pubsub-subscriber3] DEBUG o.s.c.s.b.AbstractMessageChannelBinder$SendingHandler - org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@1f1ef8b2 received message: GenericMessage [payload=byte[287], headers={contentType=application/json, id=077a18d2-dec6-8272-1fcf-d271d11117f5, timestamp=1566389724394}]
14:15:24.394 [gcp-pubsub-subscriber3] DEBUG o.s.c.g.p.i.o.PubSubMessageHandler - org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler@1c84a8f5 received message: GenericMessage [payload=byte[287], headers={contentType=application/json, id=6cf9bda7-2270-8892-0e2e-e402ea85f88a, timestamp=1566389724394}]
14:15:24.398 [gcp-pubsub-subscriber3] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'output', message: GenericMessage [payload=byte[287], headers={contentType=application/json, id=077a18d2-dec6-8272-1fcf-d271d11117f5, timestamp=1566389724394}]
14:15:24.399 [gcp-pubsub-subscriber3] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'input', message: GenericMessage [payload=byte[1], headers={id=52e2d9ff-2553-0f20-cd6c-1a9d6135f3cf, contentType=application/json, timestamp=1566389724166}]
14:15:24.400 [gcp-pubsub-subscriber1] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'output', message: GenericMessage [payload=byte[575], headers={contentType=application/json, id=c34eb207-1a4c-6bc9-f91e-517c983c73b8, timestamp=1566389724390}]
14:15:24.400 [gcp-pubsub-subscriber2] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'output', message: GenericMessage [payload=byte[1091], headers={contentType=application/json, id=6d8d417d-3f45-7b83-d243-2cee0162cfb1, timestamp=1566389724390}]
14:15:24.400 [gcp-pubsub-subscriber2] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'input', message: GenericMessage [payload=byte[1], headers={id=b6808f8b-0169-93e7-64b1-25d6875c14da, contentType=application/json, timestamp=1566389724166}]
14:15:24.401 [gcp-pubsub-subscriber1] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'input', message: GenericMessage [payload=byte[1], headers={id=f6869d9f-e0ea-1581-e42b-57986624861e, contentType=application/json, timestamp=1566389724166}]
14:15:24.439 [grpc-default-executor-1] DEBUG o.s.c.g.p.c.p.PubSubPublisherTemplate - Publishing to amount was successful. Message ID: 5
14:15:24.446 [grpc-default-executor-1] DEBUG o.s.c.g.p.c.p.PubSubPublisherTemplate - Publishing to amount was successful. Message ID: 4
14:15:24.446 [grpc-default-executor-1] DEBUG o.s.c.g.p.c.p.PubSubPublisherTemplate - Publishing to amount was successful. Message ID: 6
I'd be most gratefully for any assistance
Upvotes: 2
Views: 2050
Reputation: 466
Pub/Sub template is a relatively low-level API that does not know about Spring Cloud Stream's existence. In this call to Pub/Sub template, the first parameter needs to be a Pub/Sub subscription name, not a Spring Cloud Stream output channel "output":
final List<AcknowledgeablePubsubMessage> msgs = template.pull("output", 3, Boolean.TRUE);
But you don't know the subscription name! In fact, a subscription does not even exist (GCP Spring Cloud Stream binder creates an anonymous input subscription automatically, but since publishing is done to a topic, won't do anything of the sort for the output channel).
You also subscribe to Subscriptions, not Topics (one topic can have multiple subscriptions that all get the same messages). So, in the beginning of your test, you'll need to create a subscription to your output "amount" topic. PubSubAdmin bean can be autowired the same way you do with PubSubTemplate.
Subscription sub = pubSubAdmin.createSubscription("testSubscription", "amount");
// ... perform the logic under test ...
final List<AcknowledgeablePubsubMessage> msgs = template.pull("testSubscription", 3, Boolean.TRUE);
GCP Pub/Sub template and admin docs: https://github.com/spring-cloud/spring-cloud-gcp/blob/master/docs/src/main/asciidoc/pubsub.adoc GCP Spring Cloud Stream docs: https://github.com/spring-cloud/spring-cloud-gcp/blob/master/docs/src/main/asciidoc/spring-stream.adoc
Upvotes: 2