How to parallelize Kafka message consumption
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}
I have a code like this:
@Component
public class PostConsumer {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setConcurrency(10);
return factory;
}
@Autowired
private PostService postService;
@KafkaListener(topics = {"topic1", "topic2"} , containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord) {
postService.sendPost(consumerRecord.value());
}
}
How to do parallel processing of partition(s) topic1 and partition(s) topic2?
Right now they are working one by one, first processing topic1->partition0, then topic2->partition0, then again topic1->partition0, etc.
P.S. When I split this method to two @KafkaListener methods it's stating working parallel, but the problem is in the topics list I'm providing a huge list of topics, don't want to have 100+ @KafkaListener methods for that.
Thank you in advance.
java spring-boot apache-kafka spring-kafka
add a comment |
I have a code like this:
@Component
public class PostConsumer {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setConcurrency(10);
return factory;
}
@Autowired
private PostService postService;
@KafkaListener(topics = {"topic1", "topic2"} , containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord) {
postService.sendPost(consumerRecord.value());
}
}
How to do parallel processing of partition(s) topic1 and partition(s) topic2?
Right now they are working one by one, first processing topic1->partition0, then topic2->partition0, then again topic1->partition0, etc.
P.S. When I split this method to two @KafkaListener methods it's stating working parallel, but the problem is in the topics list I'm providing a huge list of topics, don't want to have 100+ @KafkaListener methods for that.
Thank you in advance.
java spring-boot apache-kafka spring-kafka
add a comment |
I have a code like this:
@Component
public class PostConsumer {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setConcurrency(10);
return factory;
}
@Autowired
private PostService postService;
@KafkaListener(topics = {"topic1", "topic2"} , containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord) {
postService.sendPost(consumerRecord.value());
}
}
How to do parallel processing of partition(s) topic1 and partition(s) topic2?
Right now they are working one by one, first processing topic1->partition0, then topic2->partition0, then again topic1->partition0, etc.
P.S. When I split this method to two @KafkaListener methods it's stating working parallel, but the problem is in the topics list I'm providing a huge list of topics, don't want to have 100+ @KafkaListener methods for that.
Thank you in advance.
java spring-boot apache-kafka spring-kafka
I have a code like this:
@Component
public class PostConsumer {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setConcurrency(10);
return factory;
}
@Autowired
private PostService postService;
@KafkaListener(topics = {"topic1", "topic2"} , containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord) {
postService.sendPost(consumerRecord.value());
}
}
How to do parallel processing of partition(s) topic1 and partition(s) topic2?
Right now they are working one by one, first processing topic1->partition0, then topic2->partition0, then again topic1->partition0, etc.
P.S. When I split this method to two @KafkaListener methods it's stating working parallel, but the problem is in the topics list I'm providing a huge list of topics, don't want to have 100+ @KafkaListener methods for that.
Thank you in advance.
java spring-boot apache-kafka spring-kafka
java spring-boot apache-kafka spring-kafka
edited Nov 16 '18 at 21:56
Yengibar Manasyan
asked Nov 16 '18 at 21:49
Yengibar ManasyanYengibar Manasyan
448
448
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
You need to add concurrency to the listener container. However, if each topic has only one partition, that won't help; you need enough partitions in each topic to support your desired concurrency, or use a separate container for each topic.
We could consider an alternative strategy with an option to distribute the topics themselves across consumers.
Open a GitHub issue and we'll consider it.
EDIT
You can also change the default partition assignor, for example:
spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
and
@SpringBootApplication
public class Kgh941Application {
public static void main(String args) {
SpringApplication.run(Kgh941Application.class, args);
}
@KafkaListener(id = "kgh941", concurrency = "30",
topics = {"kgh941a", "kgh941b", "kgh941c", "kgh941d", "kgh941e", "kgh941f", "kgh941g", "kgh941h"})
public void listen(String in) {
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> {
System.out.println("Hit enter to get assignments");
System.in.read();
MessageListenerContainer container = registry.getListenerContainer("kgh941");
@SuppressWarnings("unchecked")
List<KafkaMessageListenerContainer<?, ?>> containers = (List<KafkaMessageListenerContainer<?, ?>>) new DirectFieldAccessor(
container).getPropertyValue("containers");
containers.forEach(c -> System.out.println(c.getAssignedPartitions().size() + ":"
+ c.getAssignedPartitions()));
};
}
@Bean
public NewTopic topica() {
return new NewTopic("kgh941a", 10, (short) 1);
}
@Bean
public NewTopic topicb() {
return new NewTopic("kgh941b", 10, (short) 1);
}
@Bean
public NewTopic topicc() {
return new NewTopic("kgh941c", 10, (short) 1);
}
@Bean
public NewTopic topicd() {
return new NewTopic("kgh941d", 10, (short) 1);
}
@Bean
public NewTopic topice() {
return new NewTopic("kgh941e", 10, (short) 1);
}
@Bean
public NewTopic topicf() {
return new NewTopic("kgh941f", 10, (short) 1);
}
@Bean
public NewTopic topicg() {
return new NewTopic("kgh941g", 10, (short) 1);
}
@Bean
public NewTopic topich() {
return new NewTopic("kgh941h", 10, (short) 1);
}
}
and
3:[kgh941b-0, kgh941h-0, kgh941e-0]
2:[kgh941c-1, kgh941f-1]
2:[kgh941c-4, kgh941f-4]
2:[kgh941c-5, kgh941f-5]
2:[kgh941f-6, kgh941c-6]
2:[kgh941c-7, kgh941f-7]
2:[kgh941c-8, kgh941f-8]
2:[kgh941c-9, kgh941f-9]
3:[kgh941d-0, kgh941a-0, kgh941g-0]
3:[kgh941a-1, kgh941d-1, kgh941g-1]
3:[kgh941a-2, kgh941d-2, kgh941g-2]
3:[kgh941a-3, kgh941g-3, kgh941d-3]
3:[kgh941d-4, kgh941a-4, kgh941g-4]
3:[kgh941a-5, kgh941d-5, kgh941g-5]
3:[kgh941a-6, kgh941d-6, kgh941g-6]
3:[kgh941a-7, kgh941g-7, kgh941d-7]
3:[kgh941d-8, kgh941a-8, kgh941g-8]
3:[kgh941d-9, kgh941g-9, kgh941a-9]
3:[kgh941e-1, kgh941b-1, kgh941h-1]
3:[kgh941b-2, kgh941e-2, kgh941h-2]
3:[kgh941b-3, kgh941e-3, kgh941h-3]
3:[kgh941b-4, kgh941h-4, kgh941e-4]
3:[kgh941e-5, kgh941b-5, kgh941h-5]
3:[kgh941b-6, kgh941e-6, kgh941h-6]
3:[kgh941b-7, kgh941e-7, kgh941h-7]
3:[kgh941b-8, kgh941h-8, kgh941e-8]
3:[kgh941e-9, kgh941b-9, kgh941h-9]
2:[kgh941c-0, kgh941f-0]
2:[kgh941f-2, kgh941c-2]
2:[kgh941c-3, kgh941f-3]
🙋♂️my question? if two topics (test1, test2) are on same cluster, the we have one container with two listener methods pointing to each topic (test1-->test1topiclistner(), test2--test2topiclistner()) since both listeners are on same container, same consumer threads will process both the topics right?
– Deadpool
Nov 16 '18 at 22:41
The number of consumer threads is controlled by the container concurrency, not the number of topics. If the concurrency is 1, all topics, all partitions will be processed on a single consumer. If the concurrency is two, and the partitions on each topic > 1, each consumer will get some partitions from each topic. As I said, we can add an option to distribute the topics across the consumers instead of Kafka distributing the partitions across consumers.
– Gary Russell
Nov 16 '18 at 23:33
1
By the way, if you manually assign partitions, rather than using Kafka group management, the partitions will indeed be distributed. You can manually assign partitions using the alternate constructor toContainerProperties
that takes a vararg ofTopicPartitionInitialiOffsset
; just set the concurrency as needed. When using@KafkaListener
, this is achieved with@KafkaListener(id = "xxx", topicPartitions = {@TopicPartition(topic = "foo", partitions = "0", @TopicPartition(topic = "bar", partitions = "0"})
.
– Gary Russell
Nov 17 '18 at 0:44
You can also switch to theRoundRobinAssignor
- see the edit.
– Gary Russell
Jan 17 at 15:39
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53345900%2fhow-to-parallelize-kafka-message-consumption%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You need to add concurrency to the listener container. However, if each topic has only one partition, that won't help; you need enough partitions in each topic to support your desired concurrency, or use a separate container for each topic.
We could consider an alternative strategy with an option to distribute the topics themselves across consumers.
Open a GitHub issue and we'll consider it.
EDIT
You can also change the default partition assignor, for example:
spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
and
@SpringBootApplication
public class Kgh941Application {
public static void main(String args) {
SpringApplication.run(Kgh941Application.class, args);
}
@KafkaListener(id = "kgh941", concurrency = "30",
topics = {"kgh941a", "kgh941b", "kgh941c", "kgh941d", "kgh941e", "kgh941f", "kgh941g", "kgh941h"})
public void listen(String in) {
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> {
System.out.println("Hit enter to get assignments");
System.in.read();
MessageListenerContainer container = registry.getListenerContainer("kgh941");
@SuppressWarnings("unchecked")
List<KafkaMessageListenerContainer<?, ?>> containers = (List<KafkaMessageListenerContainer<?, ?>>) new DirectFieldAccessor(
container).getPropertyValue("containers");
containers.forEach(c -> System.out.println(c.getAssignedPartitions().size() + ":"
+ c.getAssignedPartitions()));
};
}
@Bean
public NewTopic topica() {
return new NewTopic("kgh941a", 10, (short) 1);
}
@Bean
public NewTopic topicb() {
return new NewTopic("kgh941b", 10, (short) 1);
}
@Bean
public NewTopic topicc() {
return new NewTopic("kgh941c", 10, (short) 1);
}
@Bean
public NewTopic topicd() {
return new NewTopic("kgh941d", 10, (short) 1);
}
@Bean
public NewTopic topice() {
return new NewTopic("kgh941e", 10, (short) 1);
}
@Bean
public NewTopic topicf() {
return new NewTopic("kgh941f", 10, (short) 1);
}
@Bean
public NewTopic topicg() {
return new NewTopic("kgh941g", 10, (short) 1);
}
@Bean
public NewTopic topich() {
return new NewTopic("kgh941h", 10, (short) 1);
}
}
and
3:[kgh941b-0, kgh941h-0, kgh941e-0]
2:[kgh941c-1, kgh941f-1]
2:[kgh941c-4, kgh941f-4]
2:[kgh941c-5, kgh941f-5]
2:[kgh941f-6, kgh941c-6]
2:[kgh941c-7, kgh941f-7]
2:[kgh941c-8, kgh941f-8]
2:[kgh941c-9, kgh941f-9]
3:[kgh941d-0, kgh941a-0, kgh941g-0]
3:[kgh941a-1, kgh941d-1, kgh941g-1]
3:[kgh941a-2, kgh941d-2, kgh941g-2]
3:[kgh941a-3, kgh941g-3, kgh941d-3]
3:[kgh941d-4, kgh941a-4, kgh941g-4]
3:[kgh941a-5, kgh941d-5, kgh941g-5]
3:[kgh941a-6, kgh941d-6, kgh941g-6]
3:[kgh941a-7, kgh941g-7, kgh941d-7]
3:[kgh941d-8, kgh941a-8, kgh941g-8]
3:[kgh941d-9, kgh941g-9, kgh941a-9]
3:[kgh941e-1, kgh941b-1, kgh941h-1]
3:[kgh941b-2, kgh941e-2, kgh941h-2]
3:[kgh941b-3, kgh941e-3, kgh941h-3]
3:[kgh941b-4, kgh941h-4, kgh941e-4]
3:[kgh941e-5, kgh941b-5, kgh941h-5]
3:[kgh941b-6, kgh941e-6, kgh941h-6]
3:[kgh941b-7, kgh941e-7, kgh941h-7]
3:[kgh941b-8, kgh941h-8, kgh941e-8]
3:[kgh941e-9, kgh941b-9, kgh941h-9]
2:[kgh941c-0, kgh941f-0]
2:[kgh941f-2, kgh941c-2]
2:[kgh941c-3, kgh941f-3]
🙋♂️my question? if two topics (test1, test2) are on same cluster, the we have one container with two listener methods pointing to each topic (test1-->test1topiclistner(), test2--test2topiclistner()) since both listeners are on same container, same consumer threads will process both the topics right?
– Deadpool
Nov 16 '18 at 22:41
The number of consumer threads is controlled by the container concurrency, not the number of topics. If the concurrency is 1, all topics, all partitions will be processed on a single consumer. If the concurrency is two, and the partitions on each topic > 1, each consumer will get some partitions from each topic. As I said, we can add an option to distribute the topics across the consumers instead of Kafka distributing the partitions across consumers.
– Gary Russell
Nov 16 '18 at 23:33
1
By the way, if you manually assign partitions, rather than using Kafka group management, the partitions will indeed be distributed. You can manually assign partitions using the alternate constructor toContainerProperties
that takes a vararg ofTopicPartitionInitialiOffsset
; just set the concurrency as needed. When using@KafkaListener
, this is achieved with@KafkaListener(id = "xxx", topicPartitions = {@TopicPartition(topic = "foo", partitions = "0", @TopicPartition(topic = "bar", partitions = "0"})
.
– Gary Russell
Nov 17 '18 at 0:44
You can also switch to theRoundRobinAssignor
- see the edit.
– Gary Russell
Jan 17 at 15:39
add a comment |
You need to add concurrency to the listener container. However, if each topic has only one partition, that won't help; you need enough partitions in each topic to support your desired concurrency, or use a separate container for each topic.
We could consider an alternative strategy with an option to distribute the topics themselves across consumers.
Open a GitHub issue and we'll consider it.
EDIT
You can also change the default partition assignor, for example:
spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
and
@SpringBootApplication
public class Kgh941Application {
public static void main(String args) {
SpringApplication.run(Kgh941Application.class, args);
}
@KafkaListener(id = "kgh941", concurrency = "30",
topics = {"kgh941a", "kgh941b", "kgh941c", "kgh941d", "kgh941e", "kgh941f", "kgh941g", "kgh941h"})
public void listen(String in) {
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> {
System.out.println("Hit enter to get assignments");
System.in.read();
MessageListenerContainer container = registry.getListenerContainer("kgh941");
@SuppressWarnings("unchecked")
List<KafkaMessageListenerContainer<?, ?>> containers = (List<KafkaMessageListenerContainer<?, ?>>) new DirectFieldAccessor(
container).getPropertyValue("containers");
containers.forEach(c -> System.out.println(c.getAssignedPartitions().size() + ":"
+ c.getAssignedPartitions()));
};
}
@Bean
public NewTopic topica() {
return new NewTopic("kgh941a", 10, (short) 1);
}
@Bean
public NewTopic topicb() {
return new NewTopic("kgh941b", 10, (short) 1);
}
@Bean
public NewTopic topicc() {
return new NewTopic("kgh941c", 10, (short) 1);
}
@Bean
public NewTopic topicd() {
return new NewTopic("kgh941d", 10, (short) 1);
}
@Bean
public NewTopic topice() {
return new NewTopic("kgh941e", 10, (short) 1);
}
@Bean
public NewTopic topicf() {
return new NewTopic("kgh941f", 10, (short) 1);
}
@Bean
public NewTopic topicg() {
return new NewTopic("kgh941g", 10, (short) 1);
}
@Bean
public NewTopic topich() {
return new NewTopic("kgh941h", 10, (short) 1);
}
}
and
3:[kgh941b-0, kgh941h-0, kgh941e-0]
2:[kgh941c-1, kgh941f-1]
2:[kgh941c-4, kgh941f-4]
2:[kgh941c-5, kgh941f-5]
2:[kgh941f-6, kgh941c-6]
2:[kgh941c-7, kgh941f-7]
2:[kgh941c-8, kgh941f-8]
2:[kgh941c-9, kgh941f-9]
3:[kgh941d-0, kgh941a-0, kgh941g-0]
3:[kgh941a-1, kgh941d-1, kgh941g-1]
3:[kgh941a-2, kgh941d-2, kgh941g-2]
3:[kgh941a-3, kgh941g-3, kgh941d-3]
3:[kgh941d-4, kgh941a-4, kgh941g-4]
3:[kgh941a-5, kgh941d-5, kgh941g-5]
3:[kgh941a-6, kgh941d-6, kgh941g-6]
3:[kgh941a-7, kgh941g-7, kgh941d-7]
3:[kgh941d-8, kgh941a-8, kgh941g-8]
3:[kgh941d-9, kgh941g-9, kgh941a-9]
3:[kgh941e-1, kgh941b-1, kgh941h-1]
3:[kgh941b-2, kgh941e-2, kgh941h-2]
3:[kgh941b-3, kgh941e-3, kgh941h-3]
3:[kgh941b-4, kgh941h-4, kgh941e-4]
3:[kgh941e-5, kgh941b-5, kgh941h-5]
3:[kgh941b-6, kgh941e-6, kgh941h-6]
3:[kgh941b-7, kgh941e-7, kgh941h-7]
3:[kgh941b-8, kgh941h-8, kgh941e-8]
3:[kgh941e-9, kgh941b-9, kgh941h-9]
2:[kgh941c-0, kgh941f-0]
2:[kgh941f-2, kgh941c-2]
2:[kgh941c-3, kgh941f-3]
🙋♂️my question? if two topics (test1, test2) are on same cluster, the we have one container with two listener methods pointing to each topic (test1-->test1topiclistner(), test2--test2topiclistner()) since both listeners are on same container, same consumer threads will process both the topics right?
– Deadpool
Nov 16 '18 at 22:41
The number of consumer threads is controlled by the container concurrency, not the number of topics. If the concurrency is 1, all topics, all partitions will be processed on a single consumer. If the concurrency is two, and the partitions on each topic > 1, each consumer will get some partitions from each topic. As I said, we can add an option to distribute the topics across the consumers instead of Kafka distributing the partitions across consumers.
– Gary Russell
Nov 16 '18 at 23:33
1
By the way, if you manually assign partitions, rather than using Kafka group management, the partitions will indeed be distributed. You can manually assign partitions using the alternate constructor toContainerProperties
that takes a vararg ofTopicPartitionInitialiOffsset
; just set the concurrency as needed. When using@KafkaListener
, this is achieved with@KafkaListener(id = "xxx", topicPartitions = {@TopicPartition(topic = "foo", partitions = "0", @TopicPartition(topic = "bar", partitions = "0"})
.
– Gary Russell
Nov 17 '18 at 0:44
You can also switch to theRoundRobinAssignor
- see the edit.
– Gary Russell
Jan 17 at 15:39
add a comment |
You need to add concurrency to the listener container. However, if each topic has only one partition, that won't help; you need enough partitions in each topic to support your desired concurrency, or use a separate container for each topic.
We could consider an alternative strategy with an option to distribute the topics themselves across consumers.
Open a GitHub issue and we'll consider it.
EDIT
You can also change the default partition assignor, for example:
spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
and
@SpringBootApplication
public class Kgh941Application {
public static void main(String args) {
SpringApplication.run(Kgh941Application.class, args);
}
@KafkaListener(id = "kgh941", concurrency = "30",
topics = {"kgh941a", "kgh941b", "kgh941c", "kgh941d", "kgh941e", "kgh941f", "kgh941g", "kgh941h"})
public void listen(String in) {
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> {
System.out.println("Hit enter to get assignments");
System.in.read();
MessageListenerContainer container = registry.getListenerContainer("kgh941");
@SuppressWarnings("unchecked")
List<KafkaMessageListenerContainer<?, ?>> containers = (List<KafkaMessageListenerContainer<?, ?>>) new DirectFieldAccessor(
container).getPropertyValue("containers");
containers.forEach(c -> System.out.println(c.getAssignedPartitions().size() + ":"
+ c.getAssignedPartitions()));
};
}
@Bean
public NewTopic topica() {
return new NewTopic("kgh941a", 10, (short) 1);
}
@Bean
public NewTopic topicb() {
return new NewTopic("kgh941b", 10, (short) 1);
}
@Bean
public NewTopic topicc() {
return new NewTopic("kgh941c", 10, (short) 1);
}
@Bean
public NewTopic topicd() {
return new NewTopic("kgh941d", 10, (short) 1);
}
@Bean
public NewTopic topice() {
return new NewTopic("kgh941e", 10, (short) 1);
}
@Bean
public NewTopic topicf() {
return new NewTopic("kgh941f", 10, (short) 1);
}
@Bean
public NewTopic topicg() {
return new NewTopic("kgh941g", 10, (short) 1);
}
@Bean
public NewTopic topich() {
return new NewTopic("kgh941h", 10, (short) 1);
}
}
and
3:[kgh941b-0, kgh941h-0, kgh941e-0]
2:[kgh941c-1, kgh941f-1]
2:[kgh941c-4, kgh941f-4]
2:[kgh941c-5, kgh941f-5]
2:[kgh941f-6, kgh941c-6]
2:[kgh941c-7, kgh941f-7]
2:[kgh941c-8, kgh941f-8]
2:[kgh941c-9, kgh941f-9]
3:[kgh941d-0, kgh941a-0, kgh941g-0]
3:[kgh941a-1, kgh941d-1, kgh941g-1]
3:[kgh941a-2, kgh941d-2, kgh941g-2]
3:[kgh941a-3, kgh941g-3, kgh941d-3]
3:[kgh941d-4, kgh941a-4, kgh941g-4]
3:[kgh941a-5, kgh941d-5, kgh941g-5]
3:[kgh941a-6, kgh941d-6, kgh941g-6]
3:[kgh941a-7, kgh941g-7, kgh941d-7]
3:[kgh941d-8, kgh941a-8, kgh941g-8]
3:[kgh941d-9, kgh941g-9, kgh941a-9]
3:[kgh941e-1, kgh941b-1, kgh941h-1]
3:[kgh941b-2, kgh941e-2, kgh941h-2]
3:[kgh941b-3, kgh941e-3, kgh941h-3]
3:[kgh941b-4, kgh941h-4, kgh941e-4]
3:[kgh941e-5, kgh941b-5, kgh941h-5]
3:[kgh941b-6, kgh941e-6, kgh941h-6]
3:[kgh941b-7, kgh941e-7, kgh941h-7]
3:[kgh941b-8, kgh941h-8, kgh941e-8]
3:[kgh941e-9, kgh941b-9, kgh941h-9]
2:[kgh941c-0, kgh941f-0]
2:[kgh941f-2, kgh941c-2]
2:[kgh941c-3, kgh941f-3]
You need to add concurrency to the listener container. However, if each topic has only one partition, that won't help; you need enough partitions in each topic to support your desired concurrency, or use a separate container for each topic.
We could consider an alternative strategy with an option to distribute the topics themselves across consumers.
Open a GitHub issue and we'll consider it.
EDIT
You can also change the default partition assignor, for example:
spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
and
@SpringBootApplication
public class Kgh941Application {
public static void main(String args) {
SpringApplication.run(Kgh941Application.class, args);
}
@KafkaListener(id = "kgh941", concurrency = "30",
topics = {"kgh941a", "kgh941b", "kgh941c", "kgh941d", "kgh941e", "kgh941f", "kgh941g", "kgh941h"})
public void listen(String in) {
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> {
System.out.println("Hit enter to get assignments");
System.in.read();
MessageListenerContainer container = registry.getListenerContainer("kgh941");
@SuppressWarnings("unchecked")
List<KafkaMessageListenerContainer<?, ?>> containers = (List<KafkaMessageListenerContainer<?, ?>>) new DirectFieldAccessor(
container).getPropertyValue("containers");
containers.forEach(c -> System.out.println(c.getAssignedPartitions().size() + ":"
+ c.getAssignedPartitions()));
};
}
@Bean
public NewTopic topica() {
return new NewTopic("kgh941a", 10, (short) 1);
}
@Bean
public NewTopic topicb() {
return new NewTopic("kgh941b", 10, (short) 1);
}
@Bean
public NewTopic topicc() {
return new NewTopic("kgh941c", 10, (short) 1);
}
@Bean
public NewTopic topicd() {
return new NewTopic("kgh941d", 10, (short) 1);
}
@Bean
public NewTopic topice() {
return new NewTopic("kgh941e", 10, (short) 1);
}
@Bean
public NewTopic topicf() {
return new NewTopic("kgh941f", 10, (short) 1);
}
@Bean
public NewTopic topicg() {
return new NewTopic("kgh941g", 10, (short) 1);
}
@Bean
public NewTopic topich() {
return new NewTopic("kgh941h", 10, (short) 1);
}
}
and
3:[kgh941b-0, kgh941h-0, kgh941e-0]
2:[kgh941c-1, kgh941f-1]
2:[kgh941c-4, kgh941f-4]
2:[kgh941c-5, kgh941f-5]
2:[kgh941f-6, kgh941c-6]
2:[kgh941c-7, kgh941f-7]
2:[kgh941c-8, kgh941f-8]
2:[kgh941c-9, kgh941f-9]
3:[kgh941d-0, kgh941a-0, kgh941g-0]
3:[kgh941a-1, kgh941d-1, kgh941g-1]
3:[kgh941a-2, kgh941d-2, kgh941g-2]
3:[kgh941a-3, kgh941g-3, kgh941d-3]
3:[kgh941d-4, kgh941a-4, kgh941g-4]
3:[kgh941a-5, kgh941d-5, kgh941g-5]
3:[kgh941a-6, kgh941d-6, kgh941g-6]
3:[kgh941a-7, kgh941g-7, kgh941d-7]
3:[kgh941d-8, kgh941a-8, kgh941g-8]
3:[kgh941d-9, kgh941g-9, kgh941a-9]
3:[kgh941e-1, kgh941b-1, kgh941h-1]
3:[kgh941b-2, kgh941e-2, kgh941h-2]
3:[kgh941b-3, kgh941e-3, kgh941h-3]
3:[kgh941b-4, kgh941h-4, kgh941e-4]
3:[kgh941e-5, kgh941b-5, kgh941h-5]
3:[kgh941b-6, kgh941e-6, kgh941h-6]
3:[kgh941b-7, kgh941e-7, kgh941h-7]
3:[kgh941b-8, kgh941h-8, kgh941e-8]
3:[kgh941e-9, kgh941b-9, kgh941h-9]
2:[kgh941c-0, kgh941f-0]
2:[kgh941f-2, kgh941c-2]
2:[kgh941c-3, kgh941f-3]
edited Jan 17 at 15:38
answered Nov 16 '18 at 22:29
Gary RussellGary Russell
86k85179
86k85179
🙋♂️my question? if two topics (test1, test2) are on same cluster, the we have one container with two listener methods pointing to each topic (test1-->test1topiclistner(), test2--test2topiclistner()) since both listeners are on same container, same consumer threads will process both the topics right?
– Deadpool
Nov 16 '18 at 22:41
The number of consumer threads is controlled by the container concurrency, not the number of topics. If the concurrency is 1, all topics, all partitions will be processed on a single consumer. If the concurrency is two, and the partitions on each topic > 1, each consumer will get some partitions from each topic. As I said, we can add an option to distribute the topics across the consumers instead of Kafka distributing the partitions across consumers.
– Gary Russell
Nov 16 '18 at 23:33
1
By the way, if you manually assign partitions, rather than using Kafka group management, the partitions will indeed be distributed. You can manually assign partitions using the alternate constructor toContainerProperties
that takes a vararg ofTopicPartitionInitialiOffsset
; just set the concurrency as needed. When using@KafkaListener
, this is achieved with@KafkaListener(id = "xxx", topicPartitions = {@TopicPartition(topic = "foo", partitions = "0", @TopicPartition(topic = "bar", partitions = "0"})
.
– Gary Russell
Nov 17 '18 at 0:44
You can also switch to theRoundRobinAssignor
- see the edit.
– Gary Russell
Jan 17 at 15:39
add a comment |
🙋♂️my question? if two topics (test1, test2) are on same cluster, the we have one container with two listener methods pointing to each topic (test1-->test1topiclistner(), test2--test2topiclistner()) since both listeners are on same container, same consumer threads will process both the topics right?
– Deadpool
Nov 16 '18 at 22:41
The number of consumer threads is controlled by the container concurrency, not the number of topics. If the concurrency is 1, all topics, all partitions will be processed on a single consumer. If the concurrency is two, and the partitions on each topic > 1, each consumer will get some partitions from each topic. As I said, we can add an option to distribute the topics across the consumers instead of Kafka distributing the partitions across consumers.
– Gary Russell
Nov 16 '18 at 23:33
1
By the way, if you manually assign partitions, rather than using Kafka group management, the partitions will indeed be distributed. You can manually assign partitions using the alternate constructor toContainerProperties
that takes a vararg ofTopicPartitionInitialiOffsset
; just set the concurrency as needed. When using@KafkaListener
, this is achieved with@KafkaListener(id = "xxx", topicPartitions = {@TopicPartition(topic = "foo", partitions = "0", @TopicPartition(topic = "bar", partitions = "0"})
.
– Gary Russell
Nov 17 '18 at 0:44
You can also switch to theRoundRobinAssignor
- see the edit.
– Gary Russell
Jan 17 at 15:39
🙋♂️my question? if two topics (test1, test2) are on same cluster, the we have one container with two listener methods pointing to each topic (test1-->test1topiclistner(), test2--test2topiclistner()) since both listeners are on same container, same consumer threads will process both the topics right?
– Deadpool
Nov 16 '18 at 22:41
🙋♂️my question? if two topics (test1, test2) are on same cluster, the we have one container with two listener methods pointing to each topic (test1-->test1topiclistner(), test2--test2topiclistner()) since both listeners are on same container, same consumer threads will process both the topics right?
– Deadpool
Nov 16 '18 at 22:41
The number of consumer threads is controlled by the container concurrency, not the number of topics. If the concurrency is 1, all topics, all partitions will be processed on a single consumer. If the concurrency is two, and the partitions on each topic > 1, each consumer will get some partitions from each topic. As I said, we can add an option to distribute the topics across the consumers instead of Kafka distributing the partitions across consumers.
– Gary Russell
Nov 16 '18 at 23:33
The number of consumer threads is controlled by the container concurrency, not the number of topics. If the concurrency is 1, all topics, all partitions will be processed on a single consumer. If the concurrency is two, and the partitions on each topic > 1, each consumer will get some partitions from each topic. As I said, we can add an option to distribute the topics across the consumers instead of Kafka distributing the partitions across consumers.
– Gary Russell
Nov 16 '18 at 23:33
1
1
By the way, if you manually assign partitions, rather than using Kafka group management, the partitions will indeed be distributed. You can manually assign partitions using the alternate constructor to
ContainerProperties
that takes a vararg of TopicPartitionInitialiOffsset
; just set the concurrency as needed. When using @KafkaListener
, this is achieved with @KafkaListener(id = "xxx", topicPartitions = {@TopicPartition(topic = "foo", partitions = "0", @TopicPartition(topic = "bar", partitions = "0"})
.– Gary Russell
Nov 17 '18 at 0:44
By the way, if you manually assign partitions, rather than using Kafka group management, the partitions will indeed be distributed. You can manually assign partitions using the alternate constructor to
ContainerProperties
that takes a vararg of TopicPartitionInitialiOffsset
; just set the concurrency as needed. When using @KafkaListener
, this is achieved with @KafkaListener(id = "xxx", topicPartitions = {@TopicPartition(topic = "foo", partitions = "0", @TopicPartition(topic = "bar", partitions = "0"})
.– Gary Russell
Nov 17 '18 at 0:44
You can also switch to the
RoundRobinAssignor
- see the edit.– Gary Russell
Jan 17 at 15:39
You can also switch to the
RoundRobinAssignor
- see the edit.– Gary Russell
Jan 17 at 15:39
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53345900%2fhow-to-parallelize-kafka-message-consumption%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown