Reputation: 106
How to update the cosmos master key in change feed process while it is in running state.
I am using Change feed process in Spring Boot application and it is working fine. But it is failing if Primary key is regenerated, the process is running as background process(async), unable to catch the exception and update the key with secondary key or restart the process. Below are the details
Dependencies in Pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>4.17.0</version>
<exclusions>
<exclusion>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.8</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>1.0.9</version>
</dependency>
Code Snippet
String cosmoshost = "<cosmos db host url>";
public void startProcess(){
try {
ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
changeFeedProcessorOptions
.setFeedPollDelay(Duration.ofMillis(60000));
changeFeedProcessorOptions.setLeasePrefix("SpringBoot");
new ChangeFeedProcessorBuilder().hostName(cosmoshost)
.feedContainer(getCosmosAsyncContainer("feedcontainer"))
.leaseContainer(getCosmosAsyncContainer("leasecontainer"))
.options(changeFeedProcessorOptions).handleChanges(docs -> {
try {
for (JsonNode doc : docs) {
logger.debug("Received a record " + doc.toString());
}
} catch (Exception e) {
logger.error("Exception: {}",e.getMessage());
}
}).buildChangeFeedProcessor().start().subscribe();
} catch (Exception e) {
logger.error(e.getMessage());
}
}
private CosmosAsyncContainer getCosmosAsyncContainer(String containerId) {
CosmosAsyncClient cosmosAsyncClient = getCosmosAsyncClient();
return cosmosAsyncClient.getDatabase("DB").getContainer(containerId);
}
private CosmosAsyncClient getCosmosAsyncClient() throws Exception {
return getCosmosAsyncClient(true);
}
private CosmosAsyncClient getCosmosAsyncClient(boolean isPrimaryKey) throws Exception {
CosmosAsyncClient cosmosAsyncClient = null;
try {
String key = (isPrimaryKey)? "primarykey":"secondarykey";
ConnectionPolicy defaultConnectionPolicy = ConnectionPolicy.getDefaultPolicy();
defaultConnectionPolicy.setUserAgentSuffix("changeFeed");
cosmosAsyncClient = new CosmosClientBuilder().endpoint(cosmoshost).key(key)
.consistencyLevel(ConsistencyLevel.EVENTUAL)
.readRequestsFallbackEnabled(true).multipleWriteRegionsEnabled(true)
.contentResponseOnWriteEnabled(true).buildAsyncClient();
} catch (Exception exception) {
logger.error("Failed: {}",exception.getMessage());
if (isPrimaryKey) {
return getCosmosAsyncClient(!isPrimaryKey);
} else {
throw exception;
}
}
return cosmosAsyncClient;
}
Error:
[2021-08-11T22:38:59.059Z] [com.azure.cosmos.implementation.query.DocumentProducer] [cosmos-rntbd-nio-5-6] [199] [ERROR] [] Unexpected failure
com.azure.cosmos.implementation.UnauthorizedException: {"innerErrorMessage":"[\"The MAC signature found in the HTTP request is not the same as the computed signature. Server used following string to sign - 'post\\ndocs\\ndbs/DB/colls/leases\\nwed, 11 aug 2021 17:08:59 gmt\\n\\n'. Learn more: https://aka.ms/cosmosdb-tsg-mac-signature\"]","cosmosDiagnostics":{"userAgent":"azsdk-java-cosmos/4.17.0 Windows10/10.0 JRE/1.8.0_251","requestLatencyInMs":311,"requestStartTimeUTC":"2021-08-11T17:08:59.068Z","requestEndTimeUTC":"2021-08-11T17:08:59.379Z","responseStatisticsList":[{"storeResult":{"storePhysicalAddress":"rntbd://cdb-ms-prod-centralus1-fd46.documents.azure.com:14436/apps/9a765551-fd9d-4ff2-9920-8e076aea3c8e/services/bbd91f20-33de-4bd4-b65c-fbdc3a51fa5a/partitions/813f95e0-a7fb-4137-b4ba-fa779467f66e/replicas/132731745009873205p/","lsn":38,"globalCommittedLsn":38,"partitionKeyRangeId":"","isValid":true,"statusCode":401,"subStatusCode":0,"isGone":false,"isNotFound":false,"isInvalidPartition":false,"isThroughputControlRequestRateTooLarge":false,"requestCharge":0.0,"itemLSN":-1,"sessionToken":"-1#38","backendLatencyInMs":0.17,"exception":"[\"The MAC signature found in the HTTP request is not the same as the computed signature. Server used following string to sign - 'post\\ndocs\\ndbs/DB/colls/leases\\nwed, 11 aug 2021 17:08:59 gmt\\n\\n'. Learn more: https://aka.ms/cosmosdb-tsg-mac-signature\"]","transportRequestTimeline":[{"eventName":"created","startTimeUTC":"2021-08-11T17:08:59.070Z","durationInMicroSec":0},{"eventName":"queued","startTimeUTC":"2021-08-11T17:08:59.070Z","durationInMicroSec":0},{"eventName":"channelAcquisitionStarted","startTimeUTC":"2021-08-11T17:08:59.070Z","durationInMicroSec":1000},{"eventName":"pipelined","startTimeUTC":"2021-08-11T17:08:59.071Z","durationInMicroSec":1000},{"eventName":"transitTime","startTimeUTC":"2021-08-11T17:08:59.072Z","durationInMicroSec":305000},{"eventName":"received","startTimeUTC":"2021-08-11T17:08:59.377Z","durationInMicroSec":1000},{"eventName":"completed","startTimeUTC":"2021-08-11T17:08:59.378Z","durationInMicroSec":1000}],"rntbdRequestLengthInBytes":629,"rntbdResponseLengthInBytes":463,"requestPayloadLengthInBytes":223,"responsePayloadLengthInBytes":null,"channelTaskQueueSize":1,"pendingRequestsCount":1,"serviceEndpointStatistics":{"availableChannels":1,"acquiredChannels":0,"executorTaskQueueSize":0,"inflightRequests":1,"lastSuccessfulRequestTime":"2021-08-11T17:08:13.091Z","lastRequestTime":"2021-08-11T17:08:58.482Z","createdTime":"2021-08-11T17:02:16.015Z","isClosed":false}},"requestResponseTimeUTC":"2021-08-11T17:08:59.379Z","requestResourceType":"Document","requestOperationType":"Query"}],"supplementalResponseStatisticsList":[],"addressResolutionStatistics":{},"regionsContacted":["https://keyvaultmanagedcosmos-centralus.documents.azure.com:443/"],"retryContext":{"statusAndSubStatusCodes":null,"retryCount":0,"retryLatency":0},"metadataDiagnosticsContext":{"metadataDiagnosticList":null},"serializationDiagnosticsContext":{"serializationDiagnosticsList":null},"gatewayStatistics":null,"systemInformation":{"usedMemory":"170650 KB","availableMemory":"3507046 KB","systemCpuLoad":"(2021-08-11T17:08:33.546Z 20.4%), (2021-08-11T17:08:38.552Z 14.8%), (2021-08-11T17:08:43.546Z 15.2%), (2021-08-11T17:08:48.547Z 17.0%), (2021-08-11T17:08:53.557Z 19.8%), (2021-08-11T17:08:58.553Z 14.4%)","availableProcessors":8},"clientCfgs":{"id":-3,"connectionMode":"DIRECT","numberOfClients":5,"connCfg":{"rntbd":"(cto:PT5S, rto:PT5S, icto:PT0S, ieto:PT1H, mcpe:130, mrpc:30, cer:false)","gw":"(cps:1000, rto:PT5S, icto:null, p:false)","other":"(ed: true, cs: false)"},"consistencyCfg":"(consistency: Eventual, mm: true, prgns: [])"}}}
at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager.messageReceived(RntbdRequestManager.java:848) [azure-cosmos-4.17.0.jar:?]
at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager.channelRead(RntbdRequestManager.java:181) [azure-cosmos-4.17.0.jar:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [netty-codec-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [netty-codec-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [netty-handler-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368) [netty-handler-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234) [netty-handler-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280) [netty-handler-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) [netty-codec-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) [netty-codec-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) [netty-codec-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-common-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.65.Final.jar:4.1.65.Final]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_251]```
Upvotes: 0
Views: 699
Reputation: 15603
None of the SDK clients support auto-rotation of keys, if the keys are rotated, you need to update the client configuration.
The key rotation documented flow (https://learn.microsoft.com/azure/cosmos-db/secure-access-to-data#key-rotation) describes that in order to make a fluent rotation you would need to switch the application to use the secondary keys, rotate the primary, switch the application to use the new primary, and then rotate the secondaries.
Upvotes: 1