Vasu Kandimalla
Vasu Kandimalla

Reputation: 100

gcp pubsub-lite subscription python "A second subscriber connected with the same SubscriberId and topic partition" error

I am using the python pubsublite client(async version) for subscribing from pubsub-lite. The topic has 10 partitions, I am creating 5 clients at the same time so each client gets messages from two partitions(its working), but sometimes I am getting errors in some of the subscribers(this is not happening always)

{
    "time": 1662717414,
    "level": "ERROR",
    "message": "Future exception was never retrieved\nfuture: <Future finished exception=Aborted('A second subscriber connected with the same SubscriberId and topic partition.')>",
    "type": "applog",
    "name": "asyncio",
    "exc_info": "Traceback (most recent call last):\n  File \"/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers_async.py\", line 102, in _wrapped_aiter\n    async for response in self._call:  # pragma: no branch\n  File \"/usr/local/lib/python3.10/site-packages/grpc/aio/_call.py\", line 326, in _fetch_stream_responses\n    await self._raise_for_status()\n  File \"/usr/local/lib/python3.10/site-packages/grpc/aio/_call.py\", line 236, in _raise_for_status\n    raise _create_rpc_error(await self.initial_metadata(), await\ngrpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:\n\tstatus = StatusCode.ABORTED\n\tdetails = \"A second subscriber connected with the same SubscriberId and topic partition.\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer ipv4:172.253.63.95:443 {created_time:\"2022-09-09T09:56:48.360743382+00:00\", grpc_status:10, grpc_message:\"A second subscriber connected with the same SubscriberId and topic partition.\"}\"\n>\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/retrying_connection.py\", line 100, in _run_loop\n    await self._loop_connection(\n  File \"/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/retrying_connection.py\", line 154, in _loop_connection\n    await self._read_queue.put(await read_task)\n  File \"/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/permanent_failable.py\", line 76, in await_unless_failed\n    raise self._failure_task.exception()\n  File \"/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/gapic_connection.py\", line 71, in read\n    raise self.error()\n  File \"/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/gapic_connection.py\", line 66, in read\n    return await self.await_unless_failed(response_it.__anext__())\n  File \"/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/permanent_failable.py\", line 75, in await_unless_failed\n    return await task\n  File \"/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers_async.py\", line 105, in _wrapped_aiter\n    raise exceptions.from_grpc_error(rpc_error) from rpc_error\ngoogle.api_core.exceptions.Aborted: 409 A second subscriber connected with the same SubscriberId and topic partition. [reason: \"DUPLICATE_SUBSCRIBER_CONNECTIONS\"\ndomain: \"pubsublite.googleapis.com\"\n]"
}

exc_info value pretty-printed:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers_async.py", line 102, in _wrapped_aiter
    async for response in self._call:  # pragma: no branch
  File "/usr/local/lib/python3.10/site-packages/grpc/aio/_call.py", line 326, in _fetch_stream_responses
    await self._raise_for_status()
  File "/usr/local/lib/python3.10/site-packages/grpc/aio/_call.py", line 236, in _raise_for_status
    raise _create_rpc_error(await self.initial_metadata(), await
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
        status = StatusCode.ABORTED
        details = "A second subscriber connected with the same SubscriberId and topic partition."
        debug_error_string = "UNKNOWN:Error received from peer ipv4:172.253.63.95:443 {created_time:"2022-09-09T09:56:48.360743382+00:00", grpc_status:10, grpc_message:"A second subscriber connected with the same SubscriberId and topic partition."}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/retrying_connection.py", line 100, in _run_loop
    await self._loop_connection(
  File "/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/retrying_connection.py", line 154, in _loop_connection
    await self._read_queue.put(await read_task)
  File "/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/permanent_failable.py", line 76, in await_unless_failed
    raise self._failure_task.exception()
  File "/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/gapic_connection.py", line 71, in read
    raise self.error()
  File "/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/gapic_connection.py", line 66, in read
    return await self.await_unless_failed(response_it.__anext__())
  File "/usr/local/lib/python3.10/site-packages/google/cloud/pubsublite/internal/wire/permanent_failable.py", line 75, in await_unless_failed
    return await task
  File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers_async.py", line 105, in _wrapped_aiter
    raise exceptions.from_grpc_error(rpc_error) from rpc_error
google.api_core.exceptions.Aborted: 409 A second subscriber connected with the same SubscriberId and topic partition. [reason: "DUPLICATE_SUBSCRIBER_CONNECTIONS"
domain: "pubsublite.googleapis.com"
]

Editing to add code used:

import asyncio

from google.cloud.pubsublite.cloudpubsub import AsyncSubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    SubscriptionPath,
)
from google.oauth2 import service_account


class AsyncTimedIterable:
    def __init__(self, iterable, poll_timeout=90):
        class AsyncTimedIterator:
            def __init__(self):
                self._iterator = iterable.__aiter__()

            async def __anext__(self):
                try:
                    result = await asyncio.wait_for(
                        self._iterator.__anext__(), int(poll_timeout)
                    )
                    if not result:
                        raise StopAsyncIteration
                    return result

                except asyncio.TimeoutError as e:
                    raise e

        self._factory = AsyncTimedIterator

    def __aiter__(self):
        return self._factory()


# TODO add project info below
location = CloudZone(CloudRegion("region"), "zone")

subscription_path = SubscriptionPath("project_number", location, "subscription_id")

# TODO add service account details
gcp_creds = {}


async def async_receive_from_subscription(per_partition_count=100):
    # Configure when to pause the message stream for more incoming messages based on the
    # maximum size or number of messages that a single-partition subscriber has received,
    # whichever condition is met first.
    per_partition_flow_control_settings = FlowControlSettings(
        # 1,000 outstanding messages. Must be >0.
        messages_outstanding=per_partition_count,
        # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
        bytes_outstanding=10 * 1024 * 1024,
    )

    async with AsyncSubscriberClient(
        credentials=service_account.Credentials.from_service_account_info(gcp_creds)
    ) as async_subscriber_client:
        message_iterator = await async_subscriber_client.subscribe(
            subscription_path,
            per_partition_flow_control_settings=per_partition_flow_control_settings,
        )

        timed_iter = AsyncTimedIterable(message_iterator, 90)
        async for message in timed_iter:
            yield message


async def main():
    async for message in async_receive_from_subscription(per_partition_count=100_000):
        print(message.data)


if __name__ == "__main__":
    asyncio.run(main())


The above code is the asyncio version of https://cloud.google.com/pubsub/lite/docs/samples/pubsublite-quickstart-subscriber which will do the same thing.

Upvotes: 2

Views: 441

Answers (0)

Related Questions