Reputation: 100
I am using the python pubsublite client(async version) for subscribing from pubsub-lite. The topic has 10 partitions, I am creating 5 clients at the same time so each client gets messages from two partitions(its working), but sometimes I am getting errors in some of the subscribers(this is not happening always)
{
"time": 1662717414,
"level": "ERROR",
"message": "Future exception was never retrieved\nfuture: <Future finished exception=Aborted('A second subscriber connected with the same SubscriberId and topic partition.')>",
"type": "applog",
"name": "asyncio",
"exc_info": "Traceback (most recent call last):\n File \"/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers_async.py\", line 102, in _wrapped_aiter\n async for response in self._call: # pragma: no branch\n File \"/usr/local/lib/python3.10/site-packages/grpc/aio/_call.py\", line 326, in _fetch_stream_responses\n await self._raise_for_status()\n File \"/usr/local/lib/python3.10/site-packages/grpc/aio/_call.py\", line 236, in _raise_for_status\n raise _create_rpc_error(await self.initial_metadata(), await\ngrpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:\n\tstatus = StatusCode.ABORTED\n\tdetails = \"A second subscriber connected with the same SubscriberId and topic partition.\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer ipv4:172.253.63.95:443 {created_time:\"2022-09-09T09:56:48.360743382+00:00\", grpc_status:10, grpc_message:\"A second subscriber connected with the same SubscriberId and topic partition.\"}\"\n>\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n File \"/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/retrying_connection.py\", line 100, in _run_loop\n await self._loop_connection(\n File \"/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/retrying_connection.py\", line 154, in _loop_connection\n await self._read_queue.put(await read_task)\n File \"/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/permanent_failable.py\", line 76, in await_unless_failed\n raise self._failure_task.exception()\n File \"/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/gapic_connection.py\", line 71, in read\n raise self.error()\n File \"/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/gapic_connection.py\", line 66, in read\n return await self.await_unless_failed(response_it.__anext__())\n File \"/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/permanent_failable.py\", line 75, in await_unless_failed\n return await task\n File \"/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers_async.py\", line 105, in _wrapped_aiter\n raise exceptions.from_grpc_error(rpc_error) from rpc_error\ngoogle.api_core.exceptions.Aborted: 409 A second subscriber connected with the same SubscriberId and topic partition. [reason: \"DUPLICATE_SUBSCRIBER_CONNECTIONS\"\ndomain: \"pubsublite.googleapis.com\"\n]"
}
exc_info
value pretty-printed:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers_async.py", line 102, in _wrapped_aiter
async for response in self._call: # pragma: no branch
File "/usr/local/lib/python3.10/site-packages/grpc/aio/_call.py", line 326, in _fetch_stream_responses
await self._raise_for_status()
File "/usr/local/lib/python3.10/site-packages/grpc/aio/_call.py", line 236, in _raise_for_status
raise _create_rpc_error(await self.initial_metadata(), await
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
status = StatusCode.ABORTED
details = "A second subscriber connected with the same SubscriberId and topic partition."
debug_error_string = "UNKNOWN:Error received from peer ipv4:172.253.63.95:443 {created_time:"2022-09-09T09:56:48.360743382+00:00", grpc_status:10, grpc_message:"A second subscriber connected with the same SubscriberId and topic partition."}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/retrying_connection.py", line 100, in _run_loop
await self._loop_connection(
File "/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/retrying_connection.py", line 154, in _loop_connection
await self._read_queue.put(await read_task)
File "/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/permanent_failable.py", line 76, in await_unless_failed
raise self._failure_task.exception()
File "/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/gapic_connection.py", line 71, in read
raise self.error()
File "/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/gapic_connection.py", line 66, in read
return await self.await_unless_failed(response_it.__anext__())
File "/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/permanent_failable.py", line 75, in await_unless_failed
return await task
File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers_async.py", line 105, in _wrapped_aiter
raise exceptions.from_grpc_error(rpc_error) from rpc_error
google.api_core.exceptions.Aborted: 409 A second subscriber connected with the same SubscriberId and topic partition. [reason: "DUPLICATE_SUBSCRIBER_CONNECTIONS"
domain: "pubsublite.googleapis.com"
]
Editing to add code used:
import asyncio
from google.cloud.pubsublite.cloudpubsub import AsyncSubscriberClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
FlowControlSettings,
SubscriptionPath,
)
from google.oauth2 import service_account
class AsyncTimedIterable:
def __init__(self, iterable, poll_timeout=90):
class AsyncTimedIterator:
def __init__(self):
self._iterator = iterable.__aiter__()
async def __anext__(self):
try:
result = await asyncio.wait_for(
self._iterator.__anext__(), int(poll_timeout)
)
if not result:
raise StopAsyncIteration
return result
except asyncio.TimeoutError as e:
raise e
self._factory = AsyncTimedIterator
def __aiter__(self):
return self._factory()
# TODO add project info below
location = CloudZone(CloudRegion("region"), "zone")
subscription_path = SubscriptionPath("project_number", location, "subscription_id")
# TODO add service account details
gcp_creds = {}
async def async_receive_from_subscription(per_partition_count=100):
# Configure when to pause the message stream for more incoming messages based on the
# maximum size or number of messages that a single-partition subscriber has received,
# whichever condition is met first.
per_partition_flow_control_settings = FlowControlSettings(
# 1,000 outstanding messages. Must be >0.
messages_outstanding=per_partition_count,
# 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
bytes_outstanding=10 * 1024 * 1024,
)
async with AsyncSubscriberClient(
credentials=service_account.Credentials.from_service_account_info(gcp_creds)
) as async_subscriber_client:
message_iterator = await async_subscriber_client.subscribe(
subscription_path,
per_partition_flow_control_settings=per_partition_flow_control_settings,
)
timed_iter = AsyncTimedIterable(message_iterator, 90)
async for message in timed_iter:
yield message
async def main():
async for message in async_receive_from_subscription(per_partition_count=100_000):
print(message.data)
if __name__ == "__main__":
asyncio.run(main())
The above code is the asyncio version of https://cloud.google.com/pubsub/lite/docs/samples/pubsublite-quickstart-subscriber which will do the same thing.
Upvotes: 2
Views: 441