mengyuci
mengyuci

Reputation: 31

can I modify the consumer auto-offset-reset to latest of kafka stream?

Working with kafka 0.10.1.0, I used these config

val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer.getClass)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest")

but these code props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest") does not work, what is the reason?

I read the code of org.apache.kafka.streams.StreamsConfig, there has some code:

private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
static
{
    Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
    tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
    tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}

public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) throws ConfigException {
    final Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());

    // disable auto commit and throw exception if there is user overridden values,
    // this is necessary for streams commit semantics
    if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
        throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
                + ", as the streams client will always turn off auto committing.");
    }

    consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);

    // bootstrap.servers should be from StreamsConfig
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
    // add client id with stream client id prefix, and group id
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");

    // add configs required for stream partition assignor
    consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
    consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
    consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
    consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
    consumerProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
    if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) {
        consumerProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, getString(ZOOKEEPER_CONNECT_CONFIG));
    }

    consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
    return consumerProps;
}

It will be use the CONSUMER_DEFAULT_OVERRIDES override the config of I set?

Upvotes: 1

Views: 2620

Answers (1)

mengyuci
mengyuci

Reputation: 31

This is a bug of 0.10.1.0,which is fixed in 0.10.1.1 and beyond. https://issues.apache.org/jira/browse/KAFKA-4361

Upvotes: 1

Related Questions