Aru
Aru

Reputation: 184

How can i create Kafka topic running in different cluster from another spark cluster?

I have two clusters running Kafka and spark separately. I want to create a kafka-topic from spark cluster. I have noticed to create a topic we need to invoke Kafka-topics.sh which wont be available in spark cluster. command should invoked through shell.

eg: /kafka_topics.sh --zookeeper :2181 --create --topic test_topic

This script should be called from spark cluster and it should get executed on Kafka cluster. Can anyone help me?

Upvotes: 0

Views: 451

Answers (1)

Anupam Jain
Anupam Jain

Reputation: 476

You can have java api and maven dependencies(kafka and zookeeper) to create kafka topic as below. You can invoke the code from the code where you are submitting spark application.

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.3</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.1</version>
</dependency>

import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import kafka.utils.ZKStringSerializer$;
import kafka.admin.AdminUtils;

public final class KafkaUtils {
    public static void main(String[] args) throws Exception {       
        KafkaUtils.createTopic("x.x.x.x:2181,y.y.y.y:2181", "topicName", 1, 0, new Properties());       
    }

    public static void createTopic(String zkHosts, String topicName, int numberOfPartition, int replicationFactor, Properties properties) {
        ZkClient zkClient = null;
        try {
            zkClient = getZkClient(zkHosts);
            AdminUtils.createTopic(zkClient, topicName, numberOfPartition, replicationFactor, properties);
        } catch (Exception exception) {
            exception.printStackTrace();
        } finally {
            if (zkClient != null) {
                try {
                    zkClient.close();
                } catch (ZkInterruptedException ex) {
                    ex.printStackTrace();
                }

            }
        }
    }

    private static ZkClient getZkClient(String zkHosts) {
        ZkClient zkClient = null;
        // Zookeeper sessionTimeoutMs
        final int sessionTimeoutMs = 10000;
        // Zookeeper connectionTimeoutMs
        final int connectionTimeoutMs = 10000;
        zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
        return zkClient;
    }
}

Here x.x.x.x and y.y.y.y are zk cluster hosts for kafka. Hope this helps.

Upvotes: 1

Related Questions