Reputation: 2310
I am trying to implement something similar to this tutorial. However, it worked because the data set is very small. How would I do this for a larger table? Because I keep gettting an out of memory error. My logs are
ka.connect.runtime.rest.RestServer:60)
[2018-04-04 17:16:17,937] INFO [Worker clientId=connect-1, groupId=connect-cluster] Marking the coordinator ip-172-31-14-140.ec2.internal:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[2018-04-04 17:16:17,938] ERROR Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:218)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,939] ERROR Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | connect-sink-redshift': (org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread:51)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,940] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)
[2018-04-04 17:16:17,940] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2018-04-04 17:16:17,940] ERROR WorkerSinkTask{id=sink-redshift-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,940] ERROR WorkerSinkTask{id=sink-redshift-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-04-04 17:16:17,940] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:96)
[2018-04-04 17:16:17,941] INFO WorkerSourceTask{id=production-db-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:306)
[2018-04-04 17:16:17,940] ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-statuses,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:334)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,946] INFO WorkerSourceTask{id=production-db-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:323)
[2018-04-04 17:16:17,954] ERROR WorkerSourceTask{id=production-db-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
java.lang.OutOfMemoryError: Java heap space
[2018-04-04 17:16:17,960] ERROR WorkerSourceTask{id=production-db-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-04-04 17:16:17,960] INFO [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:341)
[2018-04-04 17:16:17,960] INFO Stopped ServerConnector@64f4bfe4{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2018-04-04 17:16:17,967] INFO Stopped o.e.j.s.ServletContextHandler@2f06a90b{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
I have also tried increasing the memory with the suggestion here but I am unable to load the entire table into memory. Is there a way to limit the number of data produced?
Upvotes: 2
Views: 1402
Reputation: 191820
For the JDBC Connector, the most important property you can probably apply would be this, which seems to be what you are asking for.
batch.max.rows
Maximum number of rows to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector.
There is no need to "buffer the entire table into memory", With smaller batches, and more frequent polls and commits, you can ensure that almost all rows will be scanned, and you won't be at risk for a large batch failing, then the connector stopping for a period of time, then restarting and missing a few rows on the next poll.
Otherwise, make sure you aren't doing bulk table mode, as it'll try to scan the entire table again and again.
Also query
option can do a column projection on the table.
You can find more configuration options in the documentation, but any OOM errors will need to be carefully examined on a case-by-case basis by enabling JMX monitoring and exporting these values into some aggregate system you can monitor more closely like Prometheus rather than just seeing the OOM error and not knowing if changing any particular parameter is really helping.
Another option would be to use CDC based connectors like another blog post shows
Upvotes: 2