AnV
AnV

Reputation: 2844

apache pulsar : error while consuming message

I am trying to consume from pulsar topic. My client code can be seen below. But, I am facing NoClassDefFoundError. Please let me know how to resolve this.

pulsar client version: 2.10, pulsar server version: 2.10, installation type: standalone(not docker), OS: Ubuntu 20.04.4 LTS

My pulsar client code:

private static void consumeFromPulsarAsync() throws Exception {

    logger.info("consumeFromPulsarAsync()");

    PulsarClient client = PulsarClient.
        builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

    logger.info("consumeFromPulsarAsync() client");

    Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic("persistent://public/default/ack-2")
        .consumerName("pulsar-consumer-id-" + Math.random())
        .subscriptionName("pulsar-subscription-id-" + Math.random())
        .subscriptionType(SubscriptionType.Shared).subscribe();

    logger.info("consumeFromPulsarAsync() consumer");

    consumer.receiveAsync().thenCompose((msg) -> {
      logger.info("consumeFromPulsarAsync() consumed msg :: {}", msg.getValue());
      try {
        consumer.acknowledge(msg);
      } catch (PulsarClientException e) {
        throw new RuntimeException(e);
      }
      return null;
    });
  }

My pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.company</groupId>
  <artifactId>my.work.manager</artifactId>
  <version>release-1.0</version>
  <description>my Worker Manager</description>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <flink.version>1.14.3</flink.version>
    <pulsar.version>2.10.0</pulsar.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-pulsar</artifactId>
      <version>1.15.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.8.9</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.22</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.json</groupId>
      <artifactId>json</artifactId>
      <version>20210307</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.32</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>1.7.30</version>
    </dependency>
    <dependency>
      <groupId>org.yaml</groupId>
      <artifactId>snakeyaml</artifactId>
      <version>1.30</version>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.5.13</version>
    </dependency>
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>${pulsar.version}</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
          <compilerArgs>
            <arg>-Xlint:all,-options,-path</arg>
          </compilerArgs>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>2.5</version>
        <configuration>
          <archive>
            <manifest>
              <mainClass>com.company.my.manager.flink.myWorkManager</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.3.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    <resources>
      <resource>
        <directory>src/main/resources</directory>
        <targetPath>${project.build.outputDirectory}</targetPath>
        <includes>
          <include>**/*.yml</include>
        </includes>
      </resource>
    </resources>
  </build>

</project>

Error it is showing:

2022-05-10 17:52:52,549 WARN org.apache.pulsar.client.impl.MultiTopicsConsumerImpl [] - Encountered error in partition auto update timer task for multi-topic consumer. Another task will be scheduled. java.lang.NoClassDefFoundError: org/apache/pulsar/shade/com/google/common/primitives/Ints at org.apache.pulsar.shade.com.google.common.collect.Lists.computeArrayListCapacity(Lists.java:152) ~[e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.shade.com.google.common.collect.Lists.newArrayListWithExpectedSize(Lists.java:192) ~[e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl$TopicsPartitionChangedListener.onTopicsExtended(MultiTopicsConsumerImpl.java:1238) ~[e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl$1.run(MultiTopicsConsumerImpl.java:1350) [e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715) [e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34) [e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703) [e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790) [e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503) [e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at java.lang.Thread.run(Thread.java:829) [?:?]

Upvotes: 1

Views: 1438

Answers (1)

Tim Spann
Tim Spann

Reputation: 503

What type of cluster? Docker, Standalone, K8, ... Is the Pulsar client and server both 2.10? What JDK for server? for client? Any errors on the server? What server OS? What client OS?

It says WARN, did it run or crash with the missing class? Sometimes on a MAC you will get a WARN on missing SSL and it will work with warnings.

It may be related to this https://github.com/apache/pulsar/issues/9585

Steps to debug:

Create a new topic. Try again. Try to produce and consume with command line client.

Create a single partitioned topic and test with that.

I ran your example with this test

bin/pulsar-client produce --key "test1" "persistent://public/default/ack-2" -m "Test this thing 4" -n 25

--

public void consumeFromPulsarAsync() throws Exception {

        PulsarClient client = PulsarClient.
                builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("persistent://public/default/ack-2")
                .consumerName("pulsar-consumer-id-" + Math.random())
                .subscriptionName("pulsar-subscription-id-" + Math.random())
                .subscriptionType(SubscriptionType.Shared).subscribe();
        
        consumer.receiveAsync().thenCompose((msg) -> {
            log.info("consumeFromPulsarAsync() consumed msg :: {}", msg.getValue());
            try {
                consumer.acknowledge(msg);
            } catch (PulsarClientException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
            return null;
        });
    }

--

2022-05-11 10:00:47.321  INFO 28575 --- [r-client-io-6-1] o.a.pulsar.client.impl.ConnectionPool    : [[id: 0x2005ff4b, L:/192.168.1.63:49386 - R:pulsar1.fios-router.home/192.168.1.230:6650]] Connected to server
2022-05-11 10:00:47.321  INFO 28575 --- [r-client-io-6-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0x2005ff4b, L:/192.168.1.63:49386 - R:pulsar1.fios-router.home/192.168.1.230:6650] Connected through proxy to target broker at 127.0.0.1:6650
2022-05-11 10:00:47.323  INFO 28575 --- [r-client-io-7-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://public/default/ack-2-partition-2][pulsar-subscription-id-0.6345962322446594] Subscribing to topic on cnx [id: 0xf5af925d, L:/192.168.1.63:49385 - R:pulsar1.fios-router.home/192.168.1.230:6650], consumerId 2
2022-05-11 10:00:47.324  INFO 28575 --- [r-client-io-6-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://public/default/ack-2-partition-2][pulsar-subscription-id-0.6808890118894377] Subscribing to topic on cnx [id: 0x2005ff4b, L:/192.168.1.63:49386 - R:pulsar1.fios-router.home/192.168.1.230:6650], consumerId 2
2022-05-11 10:00:47.324  INFO 28575 --- [r-client-io-7-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://public/default/ack-2-partition-1][pulsar-subscription-id-0.6345962322446594] Subscribing to topic on cnx [id: 0xf5af925d, L:/192.168.1.63:49385 - R:pulsar1.fios-router.home/192.168.1.230:6650], consumerId 1
2022-05-11 10:00:47.324  INFO 28575 --- [r-client-io-6-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://public/default/ack-2-partition-1][pulsar-subscription-id-0.6808890118894377] Subscribing to topic on cnx [id: 0x2005ff4b, L:/192.168.1.63:49386 - R:pulsar1.fios-router.home/192.168.1.230:6650], consumerId 1
2022-05-11 10:00:47.324  INFO 28575 --- [r-client-io-7-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://public/default/ack-2-partition-0][pulsar-subscription-id-0.6345962322446594] Subscribing to topic on cnx [id: 0xf5af925d, L:/192.168.1.63:49385 - R:pulsar1.fios-router.home/192.168.1.230:6650], consumerId 0
2022-05-11 10:00:47.324  INFO 28575 --- [r-client-io-6-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://public/default/ack-2-partition-0][pulsar-subscription-id-0.6808890118894377] Subscribing to topic on cnx [id: 0x2005ff4b, L:/192.168.1.63:49386 - R:pulsar1.fios-router.home/192.168.1.230:6650], consumerId 0
2022-05-11 10:00:47.357  INFO 28575 --- [r-client-io-6-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://public/default/ack-2-partition-0][pulsar-subscription-id-0.6808890118894377] Subscribed to topic on pulsar1.fios-router.home/192.168.1.230:6650 -- consumer: 0
2022-05-11 10:00:47.370  INFO 28575 --- [r-client-io-6-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://public/default/ack-2-partition-1][pulsar-subscription-id-0.6808890118894377] Subscribed to topic on pulsar1.fios-router.home/192.168.1.230:6650 -- consumer: 1
2022-05-11 10:00:47.370  INFO 28575 --- [r-client-io-7-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://public/default/ack-2-partition-1][pulsar-subscription-id-0.6345962322446594] Subscribed to topic on pulsar1.fios-router.home/192.168.1.230:6650 -- consumer: 1
2022-05-11 10:00:47.371  INFO 28575 --- [r-client-io-6-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://public/default/ack-2-partition-2][pulsar-subscription-id-0.6808890118894377] Subscribed to topic on pulsar1.fios-router.home/192.168.1.230:6650 -- consumer: 2
2022-05-11 10:00:47.372  INFO 28575 --- [r-client-io-6-1] o.a.p.c.impl.MultiTopicsConsumerImpl     : [persistent://public/default/ack-2] [pulsar-subscription-id-0.6808890118894377] Success subscribe new topic persistent://public/default/ack-2 in topics consumer, partitions: 3, allTopicPartitionsNumber: 3
2022-05-11 10:00:47.394  INFO 28575 --- [r-client-io-7-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://public/default/ack-2-partition-0][pulsar-subscription-id-0.6345962322446594] Subscribed to topic on pulsar1.fios-router.home/192.168.1.230:6650 -- consumer: 0
2022-05-11 10:00:47.394  INFO 28575 --- [r-client-io-7-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://public/default/ack-2-partition-2][pulsar-subscription-id-0.6345962322446594] Subscribed to topic on pulsar1.fios-router.home/192.168.1.230:6650 -- consumer: 2
2022-05-11 10:00:47.395  INFO 28575 --- [r-client-io-7-1] o.a.p.c.impl.MultiTopicsConsumerImpl     : [persistent://public/default/ack-2] [pulsar-subscription-id-0.6345962322446594] Success subscribe new topic persistent://public/default/ack-2 in topics consumer, partitions: 3, allTopicPartitionsNumber: 3
2022-05-11 10:00:52.529  INFO 28575 --- [t-internal-11-1] a.f.a.consumer.AirQualityConsumerApp     : consumeFromPulsarAsync() consumed msg :: Test this thing 4
2022-05-11 10:00:52.529  INFO 28575 --- [nt-internal-9-1] a.f.a.consumer.AirQualityConsumerApp     : consumeFromPulsarAsync() consumed msg :: Test this thing 4
^C2022-05-11 10:00:54.823  INFO 28575 --- [r-client-io-1-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://public/default/airquality] [airqualitysj1] Closed consumer
2022-05-11 10:00:54.824  INFO 28575 --- [ionShutdownHook] o.a.pulsar.client.impl.PulsarClientImpl  : Client closing. URL: pulsar://pulsar1:6650
2022-05-11 10:00:54.826  INFO 28575 --- [r-client-io-1-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0x4d247f3d, L:/192.168.1.63:49381 ! R:pulsar1.fios-router.home/192.168.1.230:6650] Disconnected
2022-05-11 10:00:54.829  INFO 28575 --- [r-client-io-1-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0x5fa81a8b, L:/192.168.1.63:49382 ! R:pulsar1.fios-router.home/192.168.1.230:6650] Disconnected

Upvotes: 1

Related Questions