JBLKT
JBLKT

Reputation: 73

Streaming Data into Google BigQuery Tables : SocketTimeoutException, 502 Bad Gateway, 500 Internal Server Error warnings

We are using the Camel BigQuery API (version 2.20) to stream records from a message queue on an ActiveMQ server (version 5.14.3) into a Google BigQuery table.

We have implemented and deployed the streaming mechanism as an XML route definition in a Spring Framework thus:

<?xml version="1.0" encoding="UTF-8"?>
<beans
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://www.springframework.org/schema/beans"
    xmlns:beans="http://www.springframework.org/schema/beans"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans 
        ./spring-beans.xsd
        http://camel.apache.org/schema/spring
        ./camel-spring.xsd">

    <!--
    # ==========================================================================
    # ActiveMQ JMS Bean Definition
    # ==========================================================================
    -->
    <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="nio://192.168.10.10:61616?jms.useAsyncSend=true" />
                <property name="userName"  value="MyAmqUserName" />
                <property name="password"  value="MyAmqPassword" />
            </bean>
        </property>
    </bean>

    <!--
    # ==========================================================================
    # GoogleBigQueryComponent
    # https://github.com/apache/camel/tree/master/components/camel-google-bigquery
    # ==========================================================================
    -->
    <bean id="gcp" class="org.apache.camel.component.google.bigquery.GoogleBigQueryComponent">
        <property name="connectionFactory">
            <bean class="org.apache.camel.component.google.bigquery.GoogleBigQueryConnectionFactory">
                <property name="credentialsFileLocation" value="MyDir/MyGcpKeyFile.json" />
            </bean>
        </property>
    </bean>

    <!--
    # ==========================================================================
    # Main Context Bean Definition
    # ==========================================================================
    -->
    <camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring" >

        <!--
        ========================================================================
        https://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/processor/RedeliveryPolicy.html
        ========================================================================
        -->
        <onException useOriginalMessage="true">
            <exception>com.google.api.client.googleapis.json.GoogleJsonResponseException</exception>
            <exception>java.net.SocketTimeoutException</exception>
            <exception>java.net.ConnectException</exception>
            <redeliveryPolicy
                backOffMultiplier="2"
                logHandled="false"
                logRetryAttempted="true"
                maximumRedeliveries="10"
                maximumRedeliveryDelay="60000"
                redeliveryDelay="1000"
                retriesExhaustedLogLevel ="ERROR"
                retryAttemptedLogLevel="WARN"
                />
        </onException>

        <!--
        # ==================================================================
        # Message Route :
        # 1. consume messages from my AMQ queue
        # 2. write message to Google BigQuery table
        # see https://github.com/apache/camel/blob/master/components/camel-google-bigquery/src/main/docs/google-bigquery-component.adoc
        # ==================================================================
        -->
        <route>
            <from uri="jms:my.amq.queue.of.output.data.for.gcp?acknowledgementModeName=DUPS_OK_ACKNOWLEDGE&amp;concurrentConsumers=20" />
            <to uri="gcp:my_gcp_project:my_bq_data_set:my_bq_table" />
        </route>

    </camelContext>

</beans>

The above seems to work and we seem to be landing a high of rate messages/records (one route handles over 12,000 message per minute) but our logs are showing a good number of the SocketTimeoutException, 502 Bad Gateway and 500 Internal Server Error warnings :

2019-10-21 15:33:13 | WARN  | DefaultErrorHandler | Failed delivery for (MessageId: XXX on ExchangeId: XXX). On delivery attempt: 0 caught: java.net.SocketTimeoutException: connect timed out

2019-10-24 12:46:53 | WARN  | DefaultErrorHandler | Failed delivery for (MessageId: XXX on ExchangeId: XXX). On delivery attempt: 0 caught: com.google.api.client.googleapis.json.GoogleJsonResponseException: 502 Bad Gateway

2019-10-25 12:33:33 | WARN  | DefaultErrorHandler | Failed delivery for (MessageId: XXX on ExchangeId: XXX). On delivery attempt: 0 caught: com.google.api.client.googleapis.json.GoogleJsonResponseException: 500 Internal Server Error

Questions

  1. Is my use of the onException object generally/syntactically correct (barring the fine-tuning of the redeliveryPolicy attributes) ? Or have I missed out anything else ?

  2. My first warning message of interest says, "On delivery attempt: 0 caught: java.net.SocketTimeoutException". My log file does not have "On delivery attempt: 1", On delivery attempt: 2", etc. Does this mean that subsequent delivery attempts of the given message were successful ?

  3. As far as trying to stream data into GCP is concerned, should I treat the "SocketTimeoutException" "500 Internal Server Error" and "502 Bad Gateway" differently from each other or is using the same onException + redeliveryPolicy OK ?

  4. Are there any other ways I can improve the performance of this Camel / Google API method of streaming data into GCP ? Can the Camel / Google API support message batching in order to reduce the number of GCP insert operations ? I'm already using dual streams with deduplication (CamelGoogleBigQueryInsertId).

Upvotes: 0

Views: 725

Answers (1)

Yun Zhang
Yun Zhang

Reputation: 5518

Disclaimer: I don't have experience in using Camel BigQuery API. My answer is based on observation and understanding of BigQuery API in general.

  1. Based on observation that there is retriesExhaustedLogLevel ="ERROR", if no ERROR log presents, it probably means retry succeeded.
  2. Retry on timeout/500/502 can be the same. At least I'm not aware of how they can be treated differently.
  3. Batching will definitely help, based on public documentation:

Maximum rows per request: 10,000 rows per request

A maximum of 500 rows is recommended. Batching can increase performance and throughput to a point, but at the cost of per-request latency. Too few rows per request and the overhead of each request can make ingestion inefficient. Too many rows per request and the throughput may drop.

A maximum of 500 rows per request is recommended, but experimentation with representative data (schema and data sizes) will help you determine the ideal batch size.

Upvotes: 1

Related Questions