Reputation: 31
Working with kafka 0.10.1.0, I used these config
val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer.getClass)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest")
but these code props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest") does not work, what is the reason?
I read the code of org.apache.kafka.streams.StreamsConfig, there has some code:
private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
static
{
Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}
public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) throws ConfigException {
final Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
// disable auto commit and throw exception if there is user overridden values,
// this is necessary for streams commit semantics
if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+ ", as the streams client will always turn off auto committing.");
}
consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);
// bootstrap.servers should be from StreamsConfig
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
// add client id with stream client id prefix, and group id
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
// add configs required for stream partition assignor
consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
consumerProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) {
consumerProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, getString(ZOOKEEPER_CONNECT_CONFIG));
}
consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
return consumerProps;
}
It will be use the CONSUMER_DEFAULT_OVERRIDES override the config of I set?
Upvotes: 1
Views: 2620
Reputation: 31
This is a bug of 0.10.1.0,which is fixed in 0.10.1.1 and beyond. https://issues.apache.org/jira/browse/KAFKA-4361
Upvotes: 1