Harsh
Harsh

Reputation: 1

Apache Druid Broker Container Stops After Increasing maxSubqueryRows and maxSubqueryBytes

Problem Statement: I’m using a Dockerized Druid 30.0.1 setup to achieve benchmarking results for TPC-H using the documentation here. However, I encounter an issue when running queries:

Error: INVALID_INPUT (OPERATOR) Cannot issue the query, subqueries generated results beyond maximum[100000] rows. Try setting the 'maxSubqueryBytes' in the query context to 'auto' for enabling byte based limit, which chooses an optimal limit based on memory size and result's heap usage or manually configure the values of either 'maxSubqueryBytes' or 'maxSubqueryRows' in the query context. Manually alter the value carefully as it can cause the broker to go out of memory. org.apache.druid.query.ResourceLimitExceededException

To resolve this, I made the following changes in the broker container at conf/druid/single-server/micro-quickstart/broker/runtime.properties:

druid.server.http.maxSubqueryRows=100000000
druid.server.http.maxSubqueryBytes=auto
druid.server.http.maxIdleTime=PT10M

However, after running the query again, the broker container stopped, and I saw the following errors in the historical container logs:

2024-11-06T07:46:31,596 WARN [processing_4dbd88b1-f968-42cf-9cea-0d685981bbd3] org.apache.druid.server.QueryResource - Unable to write query response. (org.eclipse.jetty.io.EofException)
2024-11-06T07:46:31,596 WARN [processing_5fb1c4d8-62a5-49f9-bbea-a94d6140406c] org.apache.druid.server.QueryResource - Unable to write query response. (org.eclipse.jetty.io.EofException)
2024-11-06T07:46:31,597 WARN [qtp677930699-75[scan_[orders]_5fb1c4d8-62a5-49f9-bbea-a94d6140406c]] org.apache.druid.server.QueryLifecycle - Exception while processing queryId [5fb1c4d8-62a5-49f9-bbea-a94d6140406c] (org.apache.druid.error.DruidException: org.eclipse.jetty.io.EofException)
2024-11-06T07:46:31,597 WARN [qtp677930699-79[scan_[lineitem]_4dbd88b1-f968-42cf-9cea-0d685981bbd3]] org.apache.druid.server.QueryLifecycle - Exception while processing queryId [4dbd88b1-f968-42cf-9cea-0d685981bbd3] (org.apache.druid.error.DruidException: org.eclipse.jetty.io.EofException)
2024-11-06T07:46:31,597 WARN [qtp677930699-75[scan_[orders]_5fb1c4d8-62a5-49f9-bbea-a94d6140406c]] org.apache.druid.server.QueryResultPusher - Suppressing exception closing accumulator for query [5fb1c4d8-62a5-49f9-bbea-a94d6140406c]
org.eclipse.jetty.io.EofException: Closed
        at org.eclipse.jetty.server.HttpOutput.checkWritable(HttpOutput.java:771) ~[jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:795) ~[jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at com.google.common.io.CountingOutputStream.write(CountingOutputStream.java:54) ~[guava-32.0.1-jre.jar:?]
        at com.fasterxml.jackson.dataformat.smile.SmileGenerator._flushBuffer(SmileGenerator.java:2596) ~[jackson-dataformat-smile-2.12.7.jar:2.12.7]
        at com.fasterxml.jackson.dataformat.smile.SmileGenerator.close(SmileGenerator.java:1854) ~[jackson-dataformat-smile-2.12.7.jar:2.12.7]
        at com.fasterxml.jackson.databind.SequenceWriter.close(SequenceWriter.java:240) ~[jackson-databind-2.12.7.1.jar:2.12.7.1]
        at org.apache.druid.server.QueryResource$QueryResourceQueryResultPusher$1$1.close(QueryResource.java:560) ~[druid-server-30.0.1.jar:30.0.1]
        at org.apache.druid.server.QueryResultPusher$StreamingHttpResponseAccumulator.close(QueryResultPusher.java:473) ~[druid-server-30.0.1.jar:30.0.1]
        at org.apache.druid.server.QueryResultPusher.push(QueryResultPusher.java:185) ~[druid-server-30.0.1.jar:30.0.1]
        at org.apache.druid.server.QueryResource.doPost(QueryResource.java:218) ~[druid-server-30.0.1.jar:30.0.1]
        at jdk.internal.reflect.GeneratedMethodAccessor57.invoke(Unknown Source) ~[?:?]
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
        at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[?:?]
        at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60) ~[jersey-server-1.19.4.jar:1.19.4]
        at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205) ~[jersey-server-1.19.4.jar:1.19.4]
        at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75) ~[jersey-server-1.19.4.jar:1.19.4]

Also the broker container stops when I run the same query again. The last lines in the broker logs are:

2024-11-11T07:20:34,364 INFO [main] org.apache.druid.curator.discovery.CuratorDruidNodeAnnouncer - Announced self [{"druidNode":{"service":"druid/broker","host":"172.21.0.5","bindOnHost":false,"plaintextPort":8082,"port":-1,"tlsPort":-1,"enablePlaintextPort":true,"enableTlsPort":false},"nodeType":"broker","services":{"lookupNodeService":{"type":"lookupNodeService","lookupTier":"__default"}},"startTime":"2024-11-11T07:20:27.454Z"}}.
2024-11-11T07:20:34,366 INFO [main] org.apache.druid.curator.discovery.CuratorServiceAnnouncer - Announcing service[DruidNode{serviceName='druid/broker', host='172.21.0.5', bindOnHost=false, port=-1, plaintextPort=8082, enablePlaintextPort=true, tlsPort=-1, enableTlsPort=false}]
2024-11-11T07:20:34,370 INFO [NodeRoleWatcher[BROKER]] org.apache.druid.discovery.BaseNodeRoleWatcher - Node [http://172.21.0.5:8082] of role [broker] detected.
2024-11-11T07:20:34,370 WARN [CuratorDruidNodeDiscoveryProvider-ListenerExecutor] org.apache.druid.discovery.DruidNodeDiscoveryProvider$ServiceDruidNodeDiscovery - Node[DiscoveryDruidNode{druidNode=DruidNode{serviceName='druid/broker', host='172.21.0.5', bindOnHost=false, port=-1, plaintextPort=8082, enablePlaintextPort=true, tlsPort=-1, enableTlsPort=false}, nodeRole='BROKER', services={lookupNodeService=LookupNodeService{lookupTier='__default'}}', startTime=2024-11-11T07:20:27.454Z}] discovered but doesn't have service[dataNodeService]. Ignored.
2024-11-11T07:20:34,373 INFO [main] org.apache.druid.java.util.common.lifecycle.Lifecycle - Successfully started lifecycle [module]

I am unable to determine the root cause of the issue. Any insights on how to resolve this would be greatly appreciated!

I am expecting to run the query successful and return the data or count. image

Also If you can share any links for benchmarking where I can get the setup related details or configurations to confirm as In the provided link http://static.druid.io/docs/druid.pdf I am not able to find cpu and RAM for middlemanger nodes My setup docker-compose.yml

version: "2.2"

volumes:
  metadata_data: {}
  middle_var: {}
  historical_var: {}
  broker_var: {}
  coordinator_var: {}
  router_var: {}
  druid_shared: {}

services:
  postgres:
    container_name: postgres
    image: postgres:latest
    ports:
      - "5432:5432"
    volumes:
      - metadata_data:/var/lib/postgresql/data
    environment:
      - POSTGRES_PASSWORD=FoolishPassword
      - POSTGRES_USER=druid
      - POSTGRES_DB=druid

  zookeeper:
    container_name: zookeeper
    image: zookeeper:3.9.2
    ports:
      - "2181:2181"
    environment:
      - ZOO_MY_ID=1

  coordinator:
    image: apache/druid:30.0.1
    container_name: coordinator
    volumes:
      - /data5/druid/shared:/opt/shared
      - /data5/druid/coordinator:/opt/druid/var
    depends_on:
      - zookeeper
      - postgres
    ports:
      - "8081:8081"
    command:
      - coordinator
    env_file:
      - environment
    deploy:
      resources:
        limits:
          cpus: '8'        # Increased to match instance type capabilities.
          memory: '30g'    # Increased memory allocation.
        reservations:
          cpus: '4'
          memory: '15g'

  broker:
    image: apache/druid:30.0.1
    container_name: broker
    volumes:
      - /data5/druid/broker:/opt/druid/var
    depends_on:
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8082:8082"
    command:
      - broker
    env_file:
      - environment
    deploy:
      resources:
        limits:
          cpus: '8'        # Increased to match instance type capabilities.
          memory: '15g'     # Increased memory allocation.
        reservations:
          cpus: '4'
          memory: '7g'

  historical:
    image: apache/druid:30.0.1
    container_name: historical
    volumes:
      - /data5/druid/shared:/opt/shared
      - /data5/druid/historical:/opt/druid/var
    depends_on:
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8083:8083"
    command:
      - historical
    env_file:
      - environment
    deploy:
      resources:
        limits:
          cpus: '8'        # Increased to match instance type capabilities.
          memory: '30g'     # Increased memory allocation.
        reservations:
          cpus: '4'
          memory: '15g'

  middlemanager:
    image: apache/druid:30.0.1
    container_name: middlemanager
    volumes:
      - /data5/druid/shared:/opt/shared
      - /data5/druid/middlemanager:/opt/druid/var
    depends_on:
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8091:8091"
      - "8100-8105:8100-8105"
    command:
      - middleManager
    env_file:
      - environment

  router:
    image: apache/druid:30.0.1
    container_name: router
    volumes:
      - /data5/druid/router:/opt/druid/var
    depends_on:
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8888:8888"
    command:
      - router
    env_file:
      - environment
    deploy:
      resources:
        limits:
          cpus: '4'
          memory: '8g'
        reservations:
          cpus: '2'
          memory: '4g'

Environment file ->

# Java tuning
DRUID_XMX=48g
DRUID_XMS=48g
DRUID_MAXNEWSIZE=8g
DRUID_NEWSIZE=8g
DRUID_MAXDIRECTMEMORYSIZE=16g
DRUID_SINGLE_NODE_CONF=micro-quickstart

druid_emitter_logging_logLevel=debug

druid_extensions_loadList=["druid-histogram","druid-kafka-indexing-service","druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage", "druid-multi-stage-query"]

druid_zk_service_host=zookeeper

druid_metadata_storage_host=
druid_metadata_storage_type=postgresql
druid_metadata_storage_connector_connectURI=jdbc:postgresql://postgres:5432/druid
druid_metadata_storage_connector_user=druid
druid_metadata_storage_connector_password=FoolishPassword

druid_coordinator_balancer_strategy=cachingCost

druid_indexer_runner_javaOptsArray=["-server", "-Xmx48g", "-Xms48g", "-XX:MaxDirectMemorySize=16g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB

druid_storage_type=local
druid_storage_storageDirectory=/opt/shared/segments
druid_indexer_logs_type=file
druid_indexer_logs_directory=/opt/shared/indexing-logs


druid_processing_numThreads=32
druid_processing_numMergeBuffers=8

# Increase the worker capacity
druid.worker.capacity=6  # Adjust this based on your available resources

DRUID_LOG4J=<?xml version="1.0" encoding="UTF-8" ?><Configuration status="WARN"><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/></Console></Appenders><Loggers><Root level="info"><AppenderRef ref="Console"/></Root><Logger name="org.apache.druid.jetty.RequestLog" additivity="false" level="DEBUG"><AppenderRef ref="Console"/></Logger></Loggers></Configuration>

Could you please help me on this?

Upvotes: 0

Views: 71

Answers (1)

Peter Marshall
Peter Marshall

Reputation: 196

It looks like you're using the API intended for interactive queries?

druid.server.http.maxSubqueryRows is a guardrail on the interactive API to defend against floods from child query processes (including historicals). Floods happen when those child processes return more than a defined number of rows each, for example, when doing a GROUP BY on a high-cardinality column.

You may want to see this video on how this engine works.

I'd recommend you switch to using the other API for this query, which adopts asynchronous tasks to query storage directly, rather than the API you're using, which pre-fetches data to historicals and uses fan-out / fan-in to the broker process - which is where you have the issue.

You can see an example in the Python notebook here.

(Also noting that Druid 31 includes an experimental new engine called Dart. It's not GA.)

It sounds like you are hoping that a single config exists that says, to get a QPS of X, use this type of config?

The QPS on the interactive API is a function of the amount of parallelism possible, the scan time of your segments, partitions covered and, of course, the number of queries hitting the cluster at the same time. (Plus a few other things!)

This is touched on in the Learn Druid course on data modelling.

Re: recommended configurations, in general, people want to maximise use of all resources on a given device. You'll see example hardware profiles in documentation that do this for some AWS-type machines. Therefore you can work out which profile to use to maximise resources on a given machine. However, the recommendation nowadays is to use the start-druid command: this will calculate the necessary configs for you and you don't have to worry about selecting an appropriate config. You can also select specific processes that the script spins up, making it possible to test a clustered deployment quickly.

Upvotes: 0

Related Questions