Reputation: 1262
After starting the Google PubSub emulator and sending some messages to it, I start the System Under Test and when I try to start the SubscriberClient, I get an exception:
Grpc.Core.RpcException
HResult=0x80131500
Message=Status(StatusCode="NotFound", Detail="Subscription does not exist (resource=franchisingSalesReceipts-motorpromo-sub-dl)", DebugException="Grpc.Core.Internal.CoreErrorDetailException: {"created":"@1635433978.567000000","description":"Error received from peer ipv4:127.0.0.1:8085","file":"..\..\..\src\core\lib\surface\call.cc","file_line":1067,"grpc_message":"Subscription does not exist (resource=franchisingSalesReceipts-motorpromo-sub-dl)","grpc_status":5}")
Source=Google.Cloud.PubSub.V1
StackTrace:
at Google.Cloud.PubSub.V1.SubscriberClientImpl.SingleChannel.HandleRpcFailure(Exception e)
at Google.Cloud.PubSub.V1.SubscriberClientImpl.SingleChannel.HandlePullMessageData(Task`1 moveNextTask)
at Google.Cloud.PubSub.V1.SubscriberClientImpl.SingleChannel.<>c__DisplayClass46_0.<HandlePullMoveNext>b__1()
at Google.Cloud.PubSub.V1.SubscriberClientImpl.SingleChannel.<StartAsync>d__38.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at Google.Cloud.PubSub.V1.Tasks.ForwardingAwaiter.GetResult()
at Google.Cloud.PubSub.V1.Tasks.Extensions.<>c__DisplayClass4_0.<<ConfigureAwaitHideErrors>g__Inner|0>d.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at PromotionEngine.OneToOne.Infrastructure.Services.GooglePubSubConsumerService.<RegisterEventHandler>d__6`2.MoveNext() in C:\Users\jmourao\workspace\MotorPromocao\PromotionEngine.OneToOne\src\PromotionEngine.OneToOne.Infrastructure\Services\GooglePubSubConsumerService.cs:line 57
This exception was originally thrown at this call stack:
[External Code]
PromotionEngine.OneToOne.Infrastructure.Services.GooglePubSubConsumerService.RegisterEventHandler<TWorker, TMessage>(string) in GooglePubSubConsumerService.cs
REPRO STEPS
After starting the emulator using this command:
gcloud beta emulators pubsub start --project=integracao-apigee-dev
I connect a publisher using something like this:
public GcpEmulatorPublisherService(
ILogger<GcpEmulatorPublisherService> logger,
IConfiguration config)
{
_logger = logger;
string endpoint = config.GetValue<string>("GoogleCloudPubSub:host");
string projectId = config.GetValue<string>("GoogleCloudPubSub:projectId");
string[] topics = config.GetSection("GoogleCloudPubSub:topics").Get<string[]>();
string currentTopicId = config.GetValue<string>("GoogleCloudPubSub:currentTopicId");
PublisherServiceApiClient publisherService = new PublisherServiceApiClientBuilder
{
Endpoint = endpoint,
ChannelCredentials = ChannelCredentials.Insecure
}.Build();
foreach (string topicId in topics)
{
TopicName topicName = new TopicName(projectId, topicId);
try
{
publisherService.CreateTopic(topicName);
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.AlreadyExists)
{
_logger.LogInformation($"Topic already exists: ProjectId {projectId} TopicId {topicId}");
}
}
var settings = new PublisherClient.ClientCreationSettings(
credentials: ChannelCredentials.Insecure,
serviceEndpoint: endpoint
);
_publisherClient = PublisherClient.Create(new TopicName(projectId, currentTopicId), settings);
}
After that, I just start my system under test with the Insecure credentials and localhost ServiceEndpoint:
SubscriberClient.ClientCreationSettings clientCreationSettings = new(clientCount: 1, credentials: ChannelCredentials.Insecure, serviceEndpoint: "localhost:8085");
var subscriberClient = await SubscriberClient.CreateAsync(subscription, clientCreationSettings);
Everything ok until here.
When I try to start the subscription like this:
await subscriberClient.StartAsync(async (PubsubMessage message, CancellationToken cancellationToken) =>
{
try
{
TMessage messageBody = ConvertMessageBody<TMessage>(message);
TWorker worker = DependencyResolver.GetRequiredService<TWorker>();
await worker.ProcessAsync(messageBody);
return await Task.FromResult(SubscriberClient.Reply.Ack);
}
catch (Exception exception)
{
string exceptionMessage = string.Format(
"Exception reading message from subscription from supplier {0}. See inner exception for details. Message={1}",
topicName, exception.Message);
Exception trackedException = new(exceptionMessage, exception);
_telemetryWrapper.TrackException(trackedException);
_logger.LogError(trackedException, "Error processing message");
return await Task.FromResult(SubscriberClient.Reply.Nack);
}
}).ConfigureAwait(false);
I get the afore mentioned exception.
Upvotes: 0
Views: 1400
Reputation: 1262
I found out what was causing the issue.
I didn't create the Subscription I was trying to use with my System Under Test (consumer).
I was not aware that you have to create a Topic and a Subscription so that the consumer application can use this subscription to receive the messages.
So in my Publisher test application, I am creating the subscriptions needed just like in the documentation:
// Subscribe to the topic.
SubscriberServiceApiClient subscriberService = await SubscriberServiceApiClient.CreateAsync();
SubscriptionName subscriptionName = new SubscriptionName(projectId, subscriptionId);
subscriberService.CreateSubscription(subscriptionName, topicName, pushConfig: null, ackDeadlineSeconds: 60);
I also found out that if you try to create a subscription using the subscriptionId equal the topicId, it won't work.
Upvotes: 1