aakashdp
aakashdp

Reputation: 143

Error connecting to kafka server via IDE in WSL2

I'm not able to connect to a kafka server(first server I tried on WSL2) running on ubuntu, via intellij or vscode running on windows. I even tried using the VM's IP, but no luck. As I understand, we should be able to connect using 'localhost' as per this doc https://learn.microsoft.com/en-us/windows/wsl/compare-versions am I missing something?

Here is my code

    Properties producerProperties = new Properties();
    producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProperties);

    ProducerRecord<String, String> record = new ProducerRecord<>("topic_1", "hello from java");
    producer.send(record);

    producer.flush();
    producer.close();

And Here's the error Error log

Upvotes: 5

Views: 3186

Answers (5)

Kiran Mohan
Kiran Mohan

Reputation: 3016

Here is what worked for me. Note that I have to also enable corporate VPN on my PC to connect to company network.

Basically, things work if we use the IP address of the WSL machine.

  1. Find WSL ip address.
    Launch WSL and execute hostname -I
  2. Add the IP address from step 1 to <path/to>/kafka/config/server.properties advertised.listeners=PLAINTEXT://x.x.x.x:9092
    Restart Kafka.
  3. Use the same IP address from step 1 as the bootstrap-server in the client side application running on Windows.

Upvotes: 2

David Stanley
David Stanley

Reputation: 61

Temporarily disabling IPV6 works for me. I followed the link in Dhruv Roy Talukdar's answer to this article

In summary create a shell script:

$ cat disable_ip6.sh
sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
sudo sysctl -w net.ipv6.conf.lo.disable_ipv6=1enter code here

and run it BEFORE starting zoopkeeper and kafka server.

Upvotes: 6

Dhruv Roy Talukdar
Dhruv Roy Talukdar

Reputation: 84

WSL2 currently has a networking issue that prevents outside programs to connect to Kafka running on WSL2 (for example your Java programs, Conduktor, etc...);

What worked for me is disabling ipv6 on WSL2 and it worked for me. You can follow this article on how to disable it. There are two ways

  • Temporarily disable the service : you need to do it after each reboot. I did it by following the above article.
  • Permanently disable the service : for doing this I followed this stackoverflow answer.This method was a bit easier for me.

Upvotes: 2

g43m1
g43m1

Reputation: 543

I had the same problem, it was quite difficult to solve. The breakthrough came when I found this closed, but apparently still broken, issue in WSL2. Basically, what was happening is I could not access Ubuntu/WSL2's localhost from IntelliJ in Windows 10. So, when I compiled and ran my program in Intellij, it gave me the error you posted.

Some details of my setup:

OS: Windows 10, version 2004 (OS Build 19041.630)

My build.sbt:

scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.0.1"
libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.4.0"
libraryDependencies += "org.apache.httpcomponents" % "httpclient" % "4.5.12"
libraryDependencies += "commons-io" % "commons-io" % "2.8.0"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.12" % "3.0.1"

This is the Scala code I am trying to run, it reads in one topic (quickstart-events) and posts to another (aux-output):

package kafka

import org.apache.spark.sql.SparkSession

object kafkaRunner {
   def main(args: Array[String]): Unit = {
     val spark = SparkSession.builder()
       .appName("Kafka First App")
       .master("local[*]")
       .getOrCreate()

     import spark.implicits._

     val df = spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", "localhost:9092")
       .option("subscribe", "quickstart-events")
       .load()

     df
       .writeStream
       .format("kafka")
       .option("kafka.bootstrap.servers", "localhost:9092")
       .option("topic", "aux-output")
       .option("checkpointLocation", "/tmp/kafka-checkpoint")
       .start()
       .awaitTermination()
  }
}

I had already run the program many times and to have a fresh start I deleted zookeeper and kafkas /tmp files. I don't know how valuable these files are, so please proceed with caution. I deleted these three directories:

  • /tmp/kafka-logs
  • /tmp/zookeeper
  • /tmp/kafka-checkpoint (This was a directory I set in my program, yours may be different, but Spark threw an error when I did not set this).

Next, I ran these commands from my Kafka directory in ubuntu. Each in a separate terminal window.

[terminal window 1 - zookeeper server]
bin/zookeeper-server-start.sh config/zookeeper.properties

[terminal window 2 - kafka server, **wait until zookeeper finishes loading before running**]
bin/kafka-server-start.sh config/server.properties

[terminal window 3 - create our 2 topics, then run the producer for **quickstart**]
(the following three commands were run in the same window)

bin/kaftopics.sh --create.sh --topic quickstart-events --bootstrap-server localhost:9092

bin/kaftopics.sh --create.sh --topic aux-output --bootstrap-server localhost:9092

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

[terminal window 4 - create a consumer for **quickstart** channel]
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

[terminal window 5 - create a consumer for **aux-out** channel]
bin/kafka-console-consumer.sh --topic aux-out --from-beginning --bootstrap-server localhost:9092

[terminal window 6 - use to run sbt]

I took this time to type some lines into the producer (window 3) and looked for output in the quickstart consumer (window 4). Nothing should show in aux-out (window 5), this will be generated when the program is run in sbt.

Then I ran my program. Rather than moving my project from Windows, I navigated to the windows directory in Ubuntu (/mnt/c/User/me/lots/of/dir/kafkaProject). I started sbt in the directory with build.sbt. Once sbt loaded I 'compile' and 'run'

It starts processing like a spark job, but then text starts to fly by. This is when you should see your input from quickstart topic output in aux-out.

Text entered in window 3's producer should show up in 4 and 5 while the program is running.

One thing I didn't mention is that after trying and failing to run the program a few times, I did do a "wsl.exe --shutdown" and restarted all my windows for a clean start. If you get errors that say the topic is missing, try changing the topic names and starting again. I found that sometimes topics I had used before and that didn't work were corrupted. I believe there is some other temporary file that I haven't discovered yet that is caching the topics, but I moved on once I got it working.

Good luck!

Upvotes: 0

onoma
onoma

Reputation: 1465

You need to replace localhost in your code with 0.0.0.0 (all IPs)

Like this:

producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "0.0.0.0:9092");

This is explained in "additional-networking-considerations" section in your link.

Upvotes: 0

Related Questions