Reputation: 9178
I have created a sample pipeline polling data from MySQL and write to HDFS(hive table as well).
Due to my requirements,I need to create Source+Connector pair for each db table. Following I have posted the configuration settings for my Source and Sink Connectors.
I can see a topic is created with one partition and with replication factor of 1.
Topic creation should be automatic, means I cant create topics manually prior to creating Source+Sink pair.
My questions:
1) Is there a way to configure the number of partitions and replication factor when creating the Source Connector?
2) If its possible to create multiple partitions, what kind of partitioning strategy does the Source Connector use?
3) Whats the correct number of workers should be created for Source and Sink Connectors?
Source Connector:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "timestamp+incrementing",
"timestamp.column.name": "modified",
"incrementing.column.name": "id",
"topic.prefix": "jdbc_var_cols-",
"tasks.max": "1",
"poll.interval.ms": "1000",
"query": "SELECT id,name,email,department,modified FROM test",
"connection.url": "jdbc:mariadb://127.0.0.1:3306/connect_test?user=root&password=confluent"
}
Sink Connector:
{
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"topics.dir": "/user/datalake/topics-hive-var_cols3",
"hadoop.conf.dir": "/tmp/quickstart/hadoop/conf",
"flush.size": "5",
"schema.compatibility": "BACKWARD",
"connect.hdfs.principal": "[email protected]",
"connect.hdfs.keytab": "/tmp/quickstart/datalake.keytab",
"tasks.max": "3",
"topics": "jdbc_var_cols-",
"hdfs.url": "hdfs://mycluster:8020",
"hive.database": "kafka_connect_db_var_cols3",
"hdfs.authentication.kerberos": "true",
"rotate.interval.ms": "1000",
"hive.metastore.uris": "thrift://hive_server:9083",
"hadoop.home": "/tmp/quickstart/hadoop",
"logs.dir": "/logs",
"format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
"hive.integration": "true",
"hdfs.namenode.principal": "nn/[email protected]",
"hive.conf.dir": "/tmp/quickstart/hadoop/conf"
}
Upvotes: 2
Views: 2376
Reputation: 191983
1) Is there a way to configure the number of partitions and replication factor when creating the Source Connector?
Not from Connect, no.
Sound like you have auto topic creation enabled on the broker, so it's using the defaults. This should ideally be disabled in a production environment and therefore you must create the topics ahead of time.
what kind of partitioning strategy does the Source Connector use?
Depends on which Connector and how the code is written (i.e. if/how it generates a Record's key). Let's say for example, with JDBC connector, the key might be the primary key of your database table. It would be hashed using the DefaultPartitioner. I do not believe Connect allows you to specify a custom partitioner at a per-connector level. If keys were null, then messages would be distributed over all partitions.
3) Whats the correct number of workers should be created for Source and Sink Connectors?
Again, depends on the source. For JDBC, you would have one task per table.
For sinks, though, the tasks can only be up-to the number of partitions for the topics being sinked (as with all consumer groups).
Also, you typically would run Connect cluster separately from your database (and Hadoop cluster)
Upvotes: 3