Приветствую всех.
Собственно сабж.
Одно приложение публикует сообщение, второе его принимает, и отправляет ответ.
То, которое отправляет ответ, не стартует. Пишет:
java.lang.IllegalStateException: a KafkaTemplate is required to support replies
Вот настройки consumera:
@Configuration
public class KafkaConfig {
@Value("${kafka.group.id}")
private String groupId;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> requestConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>());
}
@Bean
public ProducerFactory<String, Reply> replyProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> requestReplyListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(requestConsumerFactory());
factory.setReplyTemplate(replyTemplate());
return factory;
}
@Bean
public KafkaTemplate<String, Reply> replyTemplate() {
return new KafkaTemplate<>(replyProducerFactory());
}
}
Бин kafkaTemplate в контексте есть.
Кто подскажет как настроить правильно потребитель?