Reputation: 73
We are using the Camel BigQuery API (version 2.20) to stream records from a message queue on an ActiveMQ server (version 5.14.3) into a Google BigQuery table.
We have implemented and deployed the streaming mechanism as an XML route definition in a Spring Framework thus:
<?xml version="1.0" encoding="UTF-8"?>
<beans
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
./spring-beans.xsd
http://camel.apache.org/schema/spring
./camel-spring.xsd">
<!--
# ==========================================================================
# ActiveMQ JMS Bean Definition
# ==========================================================================
-->
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="nio://192.168.10.10:61616?jms.useAsyncSend=true" />
<property name="userName" value="MyAmqUserName" />
<property name="password" value="MyAmqPassword" />
</bean>
</property>
</bean>
<!--
# ==========================================================================
# GoogleBigQueryComponent
# https://github.com/apache/camel/tree/master/components/camel-google-bigquery
# ==========================================================================
-->
<bean id="gcp" class="org.apache.camel.component.google.bigquery.GoogleBigQueryComponent">
<property name="connectionFactory">
<bean class="org.apache.camel.component.google.bigquery.GoogleBigQueryConnectionFactory">
<property name="credentialsFileLocation" value="MyDir/MyGcpKeyFile.json" />
</bean>
</property>
</bean>
<!--
# ==========================================================================
# Main Context Bean Definition
# ==========================================================================
-->
<camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring" >
<!--
========================================================================
https://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/processor/RedeliveryPolicy.html
========================================================================
-->
<onException useOriginalMessage="true">
<exception>com.google.api.client.googleapis.json.GoogleJsonResponseException</exception>
<exception>java.net.SocketTimeoutException</exception>
<exception>java.net.ConnectException</exception>
<redeliveryPolicy
backOffMultiplier="2"
logHandled="false"
logRetryAttempted="true"
maximumRedeliveries="10"
maximumRedeliveryDelay="60000"
redeliveryDelay="1000"
retriesExhaustedLogLevel ="ERROR"
retryAttemptedLogLevel="WARN"
/>
</onException>
<!--
# ==================================================================
# Message Route :
# 1. consume messages from my AMQ queue
# 2. write message to Google BigQuery table
# see https://github.com/apache/camel/blob/master/components/camel-google-bigquery/src/main/docs/google-bigquery-component.adoc
# ==================================================================
-->
<route>
<from uri="jms:my.amq.queue.of.output.data.for.gcp?acknowledgementModeName=DUPS_OK_ACKNOWLEDGE&concurrentConsumers=20" />
<to uri="gcp:my_gcp_project:my_bq_data_set:my_bq_table" />
</route>
</camelContext>
</beans>
The above seems to work and we seem to be landing a high of rate messages/records (one route handles over 12,000 message per minute) but our logs are showing a good number of the SocketTimeoutException, 502 Bad Gateway and 500 Internal Server Error warnings :
2019-10-21 15:33:13 | WARN | DefaultErrorHandler | Failed delivery for (MessageId: XXX on ExchangeId: XXX). On delivery attempt: 0 caught: java.net.SocketTimeoutException: connect timed out
2019-10-24 12:46:53 | WARN | DefaultErrorHandler | Failed delivery for (MessageId: XXX on ExchangeId: XXX). On delivery attempt: 0 caught: com.google.api.client.googleapis.json.GoogleJsonResponseException: 502 Bad Gateway
2019-10-25 12:33:33 | WARN | DefaultErrorHandler | Failed delivery for (MessageId: XXX on ExchangeId: XXX). On delivery attempt: 0 caught: com.google.api.client.googleapis.json.GoogleJsonResponseException: 500 Internal Server Error
Questions
Is my use of the onException object generally/syntactically correct (barring the fine-tuning of the redeliveryPolicy attributes) ? Or have I missed out anything else ?
My first warning message of interest says, "On delivery attempt: 0 caught: java.net.SocketTimeoutException". My log file does not have "On delivery attempt: 1", On delivery attempt: 2", etc. Does this mean that subsequent delivery attempts of the given message were successful ?
As far as trying to stream data into GCP is concerned, should I treat the "SocketTimeoutException" "500 Internal Server Error" and "502 Bad Gateway" differently from each other or is using the same onException + redeliveryPolicy OK ?
Are there any other ways I can improve the performance of this Camel / Google API method of streaming data into GCP ? Can the Camel / Google API support message batching in order to reduce the number of GCP insert operations ? I'm already using dual streams with deduplication (CamelGoogleBigQueryInsertId).
Upvotes: 0
Views: 725
Reputation: 5518
Disclaimer: I don't have experience in using Camel BigQuery API. My answer is based on observation and understanding of BigQuery API in general.
retriesExhaustedLogLevel ="ERROR"
, if no ERROR log presents, it probably means retry succeeded.Maximum rows per request: 10,000 rows per request
A maximum of 500 rows is recommended. Batching can increase performance and throughput to a point, but at the cost of per-request latency. Too few rows per request and the overhead of each request can make ingestion inefficient. Too many rows per request and the throughput may drop.
A maximum of 500 rows per request is recommended, but experimentation with representative data (schema and data sizes) will help you determine the ideal batch size.
Upvotes: 1