How to parallelize Kafka message consumption





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







1















I have a code like this:



@Component
public class PostConsumer {

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setConcurrency(10);
return factory;
}

@Autowired
private PostService postService;

@KafkaListener(topics = {"topic1", "topic2"} , containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord) {

postService.sendPost(consumerRecord.value());

}
}


How to do parallel processing of partition(s) topic1 and partition(s) topic2?



Right now they are working one by one, first processing topic1->partition0, then topic2->partition0, then again topic1->partition0, etc.



P.S. When I split this method to two @KafkaListener methods it's stating working parallel, but the problem is in the topics list I'm providing a huge list of topics, don't want to have 100+ @KafkaListener methods for that.



Thank you in advance.










share|improve this question































    1















    I have a code like this:



    @Component
    public class PostConsumer {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setConcurrency(10);
    return factory;
    }

    @Autowired
    private PostService postService;

    @KafkaListener(topics = {"topic1", "topic2"} , containerFactory = "postKafkaListenerContainerFactory")
    public void sendPost(ConsumerRecord<String, Post> consumerRecord) {

    postService.sendPost(consumerRecord.value());

    }
    }


    How to do parallel processing of partition(s) topic1 and partition(s) topic2?



    Right now they are working one by one, first processing topic1->partition0, then topic2->partition0, then again topic1->partition0, etc.



    P.S. When I split this method to two @KafkaListener methods it's stating working parallel, but the problem is in the topics list I'm providing a huge list of topics, don't want to have 100+ @KafkaListener methods for that.



    Thank you in advance.










    share|improve this question



























      1












      1








      1


      1






      I have a code like this:



      @Component
      public class PostConsumer {

      @Bean
      public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
      ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
      ...
      factory.setConcurrency(10);
      return factory;
      }

      @Autowired
      private PostService postService;

      @KafkaListener(topics = {"topic1", "topic2"} , containerFactory = "postKafkaListenerContainerFactory")
      public void sendPost(ConsumerRecord<String, Post> consumerRecord) {

      postService.sendPost(consumerRecord.value());

      }
      }


      How to do parallel processing of partition(s) topic1 and partition(s) topic2?



      Right now they are working one by one, first processing topic1->partition0, then topic2->partition0, then again topic1->partition0, etc.



      P.S. When I split this method to two @KafkaListener methods it's stating working parallel, but the problem is in the topics list I'm providing a huge list of topics, don't want to have 100+ @KafkaListener methods for that.



      Thank you in advance.










      share|improve this question
















      I have a code like this:



      @Component
      public class PostConsumer {

      @Bean
      public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
      ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
      ...
      factory.setConcurrency(10);
      return factory;
      }

      @Autowired
      private PostService postService;

      @KafkaListener(topics = {"topic1", "topic2"} , containerFactory = "postKafkaListenerContainerFactory")
      public void sendPost(ConsumerRecord<String, Post> consumerRecord) {

      postService.sendPost(consumerRecord.value());

      }
      }


      How to do parallel processing of partition(s) topic1 and partition(s) topic2?



      Right now they are working one by one, first processing topic1->partition0, then topic2->partition0, then again topic1->partition0, etc.



      P.S. When I split this method to two @KafkaListener methods it's stating working parallel, but the problem is in the topics list I'm providing a huge list of topics, don't want to have 100+ @KafkaListener methods for that.



      Thank you in advance.







      java spring-boot apache-kafka spring-kafka






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 16 '18 at 21:56







      Yengibar Manasyan

















      asked Nov 16 '18 at 21:49









      Yengibar ManasyanYengibar Manasyan

      448




      448
























          1 Answer
          1






          active

          oldest

          votes


















          3














          You need to add concurrency to the listener container. However, if each topic has only one partition, that won't help; you need enough partitions in each topic to support your desired concurrency, or use a separate container for each topic.



          We could consider an alternative strategy with an option to distribute the topics themselves across consumers.



          Open a GitHub issue and we'll consider it.



          EDIT



          You can also change the default partition assignor, for example:



          spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor


          and



          @SpringBootApplication
          public class Kgh941Application {

          public static void main(String args) {
          SpringApplication.run(Kgh941Application.class, args);
          }

          @KafkaListener(id = "kgh941", concurrency = "30",
          topics = {"kgh941a", "kgh941b", "kgh941c", "kgh941d", "kgh941e", "kgh941f", "kgh941g", "kgh941h"})
          public void listen(String in) {

          }

          @Bean
          public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
          return args -> {
          System.out.println("Hit enter to get assignments");
          System.in.read();
          MessageListenerContainer container = registry.getListenerContainer("kgh941");
          @SuppressWarnings("unchecked")
          List<KafkaMessageListenerContainer<?, ?>> containers = (List<KafkaMessageListenerContainer<?, ?>>) new DirectFieldAccessor(
          container).getPropertyValue("containers");
          containers.forEach(c -> System.out.println(c.getAssignedPartitions().size() + ":"
          + c.getAssignedPartitions()));
          };
          }

          @Bean
          public NewTopic topica() {
          return new NewTopic("kgh941a", 10, (short) 1);
          }

          @Bean
          public NewTopic topicb() {
          return new NewTopic("kgh941b", 10, (short) 1);
          }

          @Bean
          public NewTopic topicc() {
          return new NewTopic("kgh941c", 10, (short) 1);
          }

          @Bean
          public NewTopic topicd() {
          return new NewTopic("kgh941d", 10, (short) 1);
          }

          @Bean
          public NewTopic topice() {
          return new NewTopic("kgh941e", 10, (short) 1);
          }

          @Bean
          public NewTopic topicf() {
          return new NewTopic("kgh941f", 10, (short) 1);
          }

          @Bean
          public NewTopic topicg() {
          return new NewTopic("kgh941g", 10, (short) 1);
          }

          @Bean
          public NewTopic topich() {
          return new NewTopic("kgh941h", 10, (short) 1);
          }

          }


          and



          3:[kgh941b-0, kgh941h-0, kgh941e-0]
          2:[kgh941c-1, kgh941f-1]
          2:[kgh941c-4, kgh941f-4]
          2:[kgh941c-5, kgh941f-5]
          2:[kgh941f-6, kgh941c-6]
          2:[kgh941c-7, kgh941f-7]
          2:[kgh941c-8, kgh941f-8]
          2:[kgh941c-9, kgh941f-9]
          3:[kgh941d-0, kgh941a-0, kgh941g-0]
          3:[kgh941a-1, kgh941d-1, kgh941g-1]
          3:[kgh941a-2, kgh941d-2, kgh941g-2]
          3:[kgh941a-3, kgh941g-3, kgh941d-3]
          3:[kgh941d-4, kgh941a-4, kgh941g-4]
          3:[kgh941a-5, kgh941d-5, kgh941g-5]
          3:[kgh941a-6, kgh941d-6, kgh941g-6]
          3:[kgh941a-7, kgh941g-7, kgh941d-7]
          3:[kgh941d-8, kgh941a-8, kgh941g-8]
          3:[kgh941d-9, kgh941g-9, kgh941a-9]
          3:[kgh941e-1, kgh941b-1, kgh941h-1]
          3:[kgh941b-2, kgh941e-2, kgh941h-2]
          3:[kgh941b-3, kgh941e-3, kgh941h-3]
          3:[kgh941b-4, kgh941h-4, kgh941e-4]
          3:[kgh941e-5, kgh941b-5, kgh941h-5]
          3:[kgh941b-6, kgh941e-6, kgh941h-6]
          3:[kgh941b-7, kgh941e-7, kgh941h-7]
          3:[kgh941b-8, kgh941h-8, kgh941e-8]
          3:[kgh941e-9, kgh941b-9, kgh941h-9]
          2:[kgh941c-0, kgh941f-0]
          2:[kgh941f-2, kgh941c-2]
          2:[kgh941c-3, kgh941f-3]





          share|improve this answer


























          • 🙋‍♂️my question? if two topics (test1, test2) are on same cluster, the we have one container with two listener methods pointing to each topic (test1-->test1topiclistner(), test2--test2topiclistner()) since both listeners are on same container, same consumer threads will process both the topics right?

            – Deadpool
            Nov 16 '18 at 22:41











          • The number of consumer threads is controlled by the container concurrency, not the number of topics. If the concurrency is 1, all topics, all partitions will be processed on a single consumer. If the concurrency is two, and the partitions on each topic > 1, each consumer will get some partitions from each topic. As I said, we can add an option to distribute the topics across the consumers instead of Kafka distributing the partitions across consumers.

            – Gary Russell
            Nov 16 '18 at 23:33






          • 1





            By the way, if you manually assign partitions, rather than using Kafka group management, the partitions will indeed be distributed. You can manually assign partitions using the alternate constructor to ContainerProperties that takes a vararg of TopicPartitionInitialiOffsset; just set the concurrency as needed. When using @KafkaListener, this is achieved with @KafkaListener(id = "xxx", topicPartitions = {@TopicPartition(topic = "foo", partitions = "0", @TopicPartition(topic = "bar", partitions = "0"}).

            – Gary Russell
            Nov 17 '18 at 0:44













          • You can also switch to the RoundRobinAssignor - see the edit.

            – Gary Russell
            Jan 17 at 15:39












          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53345900%2fhow-to-parallelize-kafka-message-consumption%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          3














          You need to add concurrency to the listener container. However, if each topic has only one partition, that won't help; you need enough partitions in each topic to support your desired concurrency, or use a separate container for each topic.



          We could consider an alternative strategy with an option to distribute the topics themselves across consumers.



          Open a GitHub issue and we'll consider it.



          EDIT



          You can also change the default partition assignor, for example:



          spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor


          and



          @SpringBootApplication
          public class Kgh941Application {

          public static void main(String args) {
          SpringApplication.run(Kgh941Application.class, args);
          }

          @KafkaListener(id = "kgh941", concurrency = "30",
          topics = {"kgh941a", "kgh941b", "kgh941c", "kgh941d", "kgh941e", "kgh941f", "kgh941g", "kgh941h"})
          public void listen(String in) {

          }

          @Bean
          public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
          return args -> {
          System.out.println("Hit enter to get assignments");
          System.in.read();
          MessageListenerContainer container = registry.getListenerContainer("kgh941");
          @SuppressWarnings("unchecked")
          List<KafkaMessageListenerContainer<?, ?>> containers = (List<KafkaMessageListenerContainer<?, ?>>) new DirectFieldAccessor(
          container).getPropertyValue("containers");
          containers.forEach(c -> System.out.println(c.getAssignedPartitions().size() + ":"
          + c.getAssignedPartitions()));
          };
          }

          @Bean
          public NewTopic topica() {
          return new NewTopic("kgh941a", 10, (short) 1);
          }

          @Bean
          public NewTopic topicb() {
          return new NewTopic("kgh941b", 10, (short) 1);
          }

          @Bean
          public NewTopic topicc() {
          return new NewTopic("kgh941c", 10, (short) 1);
          }

          @Bean
          public NewTopic topicd() {
          return new NewTopic("kgh941d", 10, (short) 1);
          }

          @Bean
          public NewTopic topice() {
          return new NewTopic("kgh941e", 10, (short) 1);
          }

          @Bean
          public NewTopic topicf() {
          return new NewTopic("kgh941f", 10, (short) 1);
          }

          @Bean
          public NewTopic topicg() {
          return new NewTopic("kgh941g", 10, (short) 1);
          }

          @Bean
          public NewTopic topich() {
          return new NewTopic("kgh941h", 10, (short) 1);
          }

          }


          and



          3:[kgh941b-0, kgh941h-0, kgh941e-0]
          2:[kgh941c-1, kgh941f-1]
          2:[kgh941c-4, kgh941f-4]
          2:[kgh941c-5, kgh941f-5]
          2:[kgh941f-6, kgh941c-6]
          2:[kgh941c-7, kgh941f-7]
          2:[kgh941c-8, kgh941f-8]
          2:[kgh941c-9, kgh941f-9]
          3:[kgh941d-0, kgh941a-0, kgh941g-0]
          3:[kgh941a-1, kgh941d-1, kgh941g-1]
          3:[kgh941a-2, kgh941d-2, kgh941g-2]
          3:[kgh941a-3, kgh941g-3, kgh941d-3]
          3:[kgh941d-4, kgh941a-4, kgh941g-4]
          3:[kgh941a-5, kgh941d-5, kgh941g-5]
          3:[kgh941a-6, kgh941d-6, kgh941g-6]
          3:[kgh941a-7, kgh941g-7, kgh941d-7]
          3:[kgh941d-8, kgh941a-8, kgh941g-8]
          3:[kgh941d-9, kgh941g-9, kgh941a-9]
          3:[kgh941e-1, kgh941b-1, kgh941h-1]
          3:[kgh941b-2, kgh941e-2, kgh941h-2]
          3:[kgh941b-3, kgh941e-3, kgh941h-3]
          3:[kgh941b-4, kgh941h-4, kgh941e-4]
          3:[kgh941e-5, kgh941b-5, kgh941h-5]
          3:[kgh941b-6, kgh941e-6, kgh941h-6]
          3:[kgh941b-7, kgh941e-7, kgh941h-7]
          3:[kgh941b-8, kgh941h-8, kgh941e-8]
          3:[kgh941e-9, kgh941b-9, kgh941h-9]
          2:[kgh941c-0, kgh941f-0]
          2:[kgh941f-2, kgh941c-2]
          2:[kgh941c-3, kgh941f-3]





          share|improve this answer


























          • 🙋‍♂️my question? if two topics (test1, test2) are on same cluster, the we have one container with two listener methods pointing to each topic (test1-->test1topiclistner(), test2--test2topiclistner()) since both listeners are on same container, same consumer threads will process both the topics right?

            – Deadpool
            Nov 16 '18 at 22:41











          • The number of consumer threads is controlled by the container concurrency, not the number of topics. If the concurrency is 1, all topics, all partitions will be processed on a single consumer. If the concurrency is two, and the partitions on each topic > 1, each consumer will get some partitions from each topic. As I said, we can add an option to distribute the topics across the consumers instead of Kafka distributing the partitions across consumers.

            – Gary Russell
            Nov 16 '18 at 23:33






          • 1





            By the way, if you manually assign partitions, rather than using Kafka group management, the partitions will indeed be distributed. You can manually assign partitions using the alternate constructor to ContainerProperties that takes a vararg of TopicPartitionInitialiOffsset; just set the concurrency as needed. When using @KafkaListener, this is achieved with @KafkaListener(id = "xxx", topicPartitions = {@TopicPartition(topic = "foo", partitions = "0", @TopicPartition(topic = "bar", partitions = "0"}).

            – Gary Russell
            Nov 17 '18 at 0:44













          • You can also switch to the RoundRobinAssignor - see the edit.

            – Gary Russell
            Jan 17 at 15:39
















          3














          You need to add concurrency to the listener container. However, if each topic has only one partition, that won't help; you need enough partitions in each topic to support your desired concurrency, or use a separate container for each topic.



          We could consider an alternative strategy with an option to distribute the topics themselves across consumers.



          Open a GitHub issue and we'll consider it.



          EDIT



          You can also change the default partition assignor, for example:



          spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor


          and



          @SpringBootApplication
          public class Kgh941Application {

          public static void main(String args) {
          SpringApplication.run(Kgh941Application.class, args);
          }

          @KafkaListener(id = "kgh941", concurrency = "30",
          topics = {"kgh941a", "kgh941b", "kgh941c", "kgh941d", "kgh941e", "kgh941f", "kgh941g", "kgh941h"})
          public void listen(String in) {

          }

          @Bean
          public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
          return args -> {
          System.out.println("Hit enter to get assignments");
          System.in.read();
          MessageListenerContainer container = registry.getListenerContainer("kgh941");
          @SuppressWarnings("unchecked")
          List<KafkaMessageListenerContainer<?, ?>> containers = (List<KafkaMessageListenerContainer<?, ?>>) new DirectFieldAccessor(
          container).getPropertyValue("containers");
          containers.forEach(c -> System.out.println(c.getAssignedPartitions().size() + ":"
          + c.getAssignedPartitions()));
          };
          }

          @Bean
          public NewTopic topica() {
          return new NewTopic("kgh941a", 10, (short) 1);
          }

          @Bean
          public NewTopic topicb() {
          return new NewTopic("kgh941b", 10, (short) 1);
          }

          @Bean
          public NewTopic topicc() {
          return new NewTopic("kgh941c", 10, (short) 1);
          }

          @Bean
          public NewTopic topicd() {
          return new NewTopic("kgh941d", 10, (short) 1);
          }

          @Bean
          public NewTopic topice() {
          return new NewTopic("kgh941e", 10, (short) 1);
          }

          @Bean
          public NewTopic topicf() {
          return new NewTopic("kgh941f", 10, (short) 1);
          }

          @Bean
          public NewTopic topicg() {
          return new NewTopic("kgh941g", 10, (short) 1);
          }

          @Bean
          public NewTopic topich() {
          return new NewTopic("kgh941h", 10, (short) 1);
          }

          }


          and



          3:[kgh941b-0, kgh941h-0, kgh941e-0]
          2:[kgh941c-1, kgh941f-1]
          2:[kgh941c-4, kgh941f-4]
          2:[kgh941c-5, kgh941f-5]
          2:[kgh941f-6, kgh941c-6]
          2:[kgh941c-7, kgh941f-7]
          2:[kgh941c-8, kgh941f-8]
          2:[kgh941c-9, kgh941f-9]
          3:[kgh941d-0, kgh941a-0, kgh941g-0]
          3:[kgh941a-1, kgh941d-1, kgh941g-1]
          3:[kgh941a-2, kgh941d-2, kgh941g-2]
          3:[kgh941a-3, kgh941g-3, kgh941d-3]
          3:[kgh941d-4, kgh941a-4, kgh941g-4]
          3:[kgh941a-5, kgh941d-5, kgh941g-5]
          3:[kgh941a-6, kgh941d-6, kgh941g-6]
          3:[kgh941a-7, kgh941g-7, kgh941d-7]
          3:[kgh941d-8, kgh941a-8, kgh941g-8]
          3:[kgh941d-9, kgh941g-9, kgh941a-9]
          3:[kgh941e-1, kgh941b-1, kgh941h-1]
          3:[kgh941b-2, kgh941e-2, kgh941h-2]
          3:[kgh941b-3, kgh941e-3, kgh941h-3]
          3:[kgh941b-4, kgh941h-4, kgh941e-4]
          3:[kgh941e-5, kgh941b-5, kgh941h-5]
          3:[kgh941b-6, kgh941e-6, kgh941h-6]
          3:[kgh941b-7, kgh941e-7, kgh941h-7]
          3:[kgh941b-8, kgh941h-8, kgh941e-8]
          3:[kgh941e-9, kgh941b-9, kgh941h-9]
          2:[kgh941c-0, kgh941f-0]
          2:[kgh941f-2, kgh941c-2]
          2:[kgh941c-3, kgh941f-3]





          share|improve this answer


























          • 🙋‍♂️my question? if two topics (test1, test2) are on same cluster, the we have one container with two listener methods pointing to each topic (test1-->test1topiclistner(), test2--test2topiclistner()) since both listeners are on same container, same consumer threads will process both the topics right?

            – Deadpool
            Nov 16 '18 at 22:41











          • The number of consumer threads is controlled by the container concurrency, not the number of topics. If the concurrency is 1, all topics, all partitions will be processed on a single consumer. If the concurrency is two, and the partitions on each topic > 1, each consumer will get some partitions from each topic. As I said, we can add an option to distribute the topics across the consumers instead of Kafka distributing the partitions across consumers.

            – Gary Russell
            Nov 16 '18 at 23:33






          • 1





            By the way, if you manually assign partitions, rather than using Kafka group management, the partitions will indeed be distributed. You can manually assign partitions using the alternate constructor to ContainerProperties that takes a vararg of TopicPartitionInitialiOffsset; just set the concurrency as needed. When using @KafkaListener, this is achieved with @KafkaListener(id = "xxx", topicPartitions = {@TopicPartition(topic = "foo", partitions = "0", @TopicPartition(topic = "bar", partitions = "0"}).

            – Gary Russell
            Nov 17 '18 at 0:44













          • You can also switch to the RoundRobinAssignor - see the edit.

            – Gary Russell
            Jan 17 at 15:39














          3












          3








          3







          You need to add concurrency to the listener container. However, if each topic has only one partition, that won't help; you need enough partitions in each topic to support your desired concurrency, or use a separate container for each topic.



          We could consider an alternative strategy with an option to distribute the topics themselves across consumers.



          Open a GitHub issue and we'll consider it.



          EDIT



          You can also change the default partition assignor, for example:



          spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor


          and



          @SpringBootApplication
          public class Kgh941Application {

          public static void main(String args) {
          SpringApplication.run(Kgh941Application.class, args);
          }

          @KafkaListener(id = "kgh941", concurrency = "30",
          topics = {"kgh941a", "kgh941b", "kgh941c", "kgh941d", "kgh941e", "kgh941f", "kgh941g", "kgh941h"})
          public void listen(String in) {

          }

          @Bean
          public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
          return args -> {
          System.out.println("Hit enter to get assignments");
          System.in.read();
          MessageListenerContainer container = registry.getListenerContainer("kgh941");
          @SuppressWarnings("unchecked")
          List<KafkaMessageListenerContainer<?, ?>> containers = (List<KafkaMessageListenerContainer<?, ?>>) new DirectFieldAccessor(
          container).getPropertyValue("containers");
          containers.forEach(c -> System.out.println(c.getAssignedPartitions().size() + ":"
          + c.getAssignedPartitions()));
          };
          }

          @Bean
          public NewTopic topica() {
          return new NewTopic("kgh941a", 10, (short) 1);
          }

          @Bean
          public NewTopic topicb() {
          return new NewTopic("kgh941b", 10, (short) 1);
          }

          @Bean
          public NewTopic topicc() {
          return new NewTopic("kgh941c", 10, (short) 1);
          }

          @Bean
          public NewTopic topicd() {
          return new NewTopic("kgh941d", 10, (short) 1);
          }

          @Bean
          public NewTopic topice() {
          return new NewTopic("kgh941e", 10, (short) 1);
          }

          @Bean
          public NewTopic topicf() {
          return new NewTopic("kgh941f", 10, (short) 1);
          }

          @Bean
          public NewTopic topicg() {
          return new NewTopic("kgh941g", 10, (short) 1);
          }

          @Bean
          public NewTopic topich() {
          return new NewTopic("kgh941h", 10, (short) 1);
          }

          }


          and



          3:[kgh941b-0, kgh941h-0, kgh941e-0]
          2:[kgh941c-1, kgh941f-1]
          2:[kgh941c-4, kgh941f-4]
          2:[kgh941c-5, kgh941f-5]
          2:[kgh941f-6, kgh941c-6]
          2:[kgh941c-7, kgh941f-7]
          2:[kgh941c-8, kgh941f-8]
          2:[kgh941c-9, kgh941f-9]
          3:[kgh941d-0, kgh941a-0, kgh941g-0]
          3:[kgh941a-1, kgh941d-1, kgh941g-1]
          3:[kgh941a-2, kgh941d-2, kgh941g-2]
          3:[kgh941a-3, kgh941g-3, kgh941d-3]
          3:[kgh941d-4, kgh941a-4, kgh941g-4]
          3:[kgh941a-5, kgh941d-5, kgh941g-5]
          3:[kgh941a-6, kgh941d-6, kgh941g-6]
          3:[kgh941a-7, kgh941g-7, kgh941d-7]
          3:[kgh941d-8, kgh941a-8, kgh941g-8]
          3:[kgh941d-9, kgh941g-9, kgh941a-9]
          3:[kgh941e-1, kgh941b-1, kgh941h-1]
          3:[kgh941b-2, kgh941e-2, kgh941h-2]
          3:[kgh941b-3, kgh941e-3, kgh941h-3]
          3:[kgh941b-4, kgh941h-4, kgh941e-4]
          3:[kgh941e-5, kgh941b-5, kgh941h-5]
          3:[kgh941b-6, kgh941e-6, kgh941h-6]
          3:[kgh941b-7, kgh941e-7, kgh941h-7]
          3:[kgh941b-8, kgh941h-8, kgh941e-8]
          3:[kgh941e-9, kgh941b-9, kgh941h-9]
          2:[kgh941c-0, kgh941f-0]
          2:[kgh941f-2, kgh941c-2]
          2:[kgh941c-3, kgh941f-3]





          share|improve this answer















          You need to add concurrency to the listener container. However, if each topic has only one partition, that won't help; you need enough partitions in each topic to support your desired concurrency, or use a separate container for each topic.



          We could consider an alternative strategy with an option to distribute the topics themselves across consumers.



          Open a GitHub issue and we'll consider it.



          EDIT



          You can also change the default partition assignor, for example:



          spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor


          and



          @SpringBootApplication
          public class Kgh941Application {

          public static void main(String args) {
          SpringApplication.run(Kgh941Application.class, args);
          }

          @KafkaListener(id = "kgh941", concurrency = "30",
          topics = {"kgh941a", "kgh941b", "kgh941c", "kgh941d", "kgh941e", "kgh941f", "kgh941g", "kgh941h"})
          public void listen(String in) {

          }

          @Bean
          public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
          return args -> {
          System.out.println("Hit enter to get assignments");
          System.in.read();
          MessageListenerContainer container = registry.getListenerContainer("kgh941");
          @SuppressWarnings("unchecked")
          List<KafkaMessageListenerContainer<?, ?>> containers = (List<KafkaMessageListenerContainer<?, ?>>) new DirectFieldAccessor(
          container).getPropertyValue("containers");
          containers.forEach(c -> System.out.println(c.getAssignedPartitions().size() + ":"
          + c.getAssignedPartitions()));
          };
          }

          @Bean
          public NewTopic topica() {
          return new NewTopic("kgh941a", 10, (short) 1);
          }

          @Bean
          public NewTopic topicb() {
          return new NewTopic("kgh941b", 10, (short) 1);
          }

          @Bean
          public NewTopic topicc() {
          return new NewTopic("kgh941c", 10, (short) 1);
          }

          @Bean
          public NewTopic topicd() {
          return new NewTopic("kgh941d", 10, (short) 1);
          }

          @Bean
          public NewTopic topice() {
          return new NewTopic("kgh941e", 10, (short) 1);
          }

          @Bean
          public NewTopic topicf() {
          return new NewTopic("kgh941f", 10, (short) 1);
          }

          @Bean
          public NewTopic topicg() {
          return new NewTopic("kgh941g", 10, (short) 1);
          }

          @Bean
          public NewTopic topich() {
          return new NewTopic("kgh941h", 10, (short) 1);
          }

          }


          and



          3:[kgh941b-0, kgh941h-0, kgh941e-0]
          2:[kgh941c-1, kgh941f-1]
          2:[kgh941c-4, kgh941f-4]
          2:[kgh941c-5, kgh941f-5]
          2:[kgh941f-6, kgh941c-6]
          2:[kgh941c-7, kgh941f-7]
          2:[kgh941c-8, kgh941f-8]
          2:[kgh941c-9, kgh941f-9]
          3:[kgh941d-0, kgh941a-0, kgh941g-0]
          3:[kgh941a-1, kgh941d-1, kgh941g-1]
          3:[kgh941a-2, kgh941d-2, kgh941g-2]
          3:[kgh941a-3, kgh941g-3, kgh941d-3]
          3:[kgh941d-4, kgh941a-4, kgh941g-4]
          3:[kgh941a-5, kgh941d-5, kgh941g-5]
          3:[kgh941a-6, kgh941d-6, kgh941g-6]
          3:[kgh941a-7, kgh941g-7, kgh941d-7]
          3:[kgh941d-8, kgh941a-8, kgh941g-8]
          3:[kgh941d-9, kgh941g-9, kgh941a-9]
          3:[kgh941e-1, kgh941b-1, kgh941h-1]
          3:[kgh941b-2, kgh941e-2, kgh941h-2]
          3:[kgh941b-3, kgh941e-3, kgh941h-3]
          3:[kgh941b-4, kgh941h-4, kgh941e-4]
          3:[kgh941e-5, kgh941b-5, kgh941h-5]
          3:[kgh941b-6, kgh941e-6, kgh941h-6]
          3:[kgh941b-7, kgh941e-7, kgh941h-7]
          3:[kgh941b-8, kgh941h-8, kgh941e-8]
          3:[kgh941e-9, kgh941b-9, kgh941h-9]
          2:[kgh941c-0, kgh941f-0]
          2:[kgh941f-2, kgh941c-2]
          2:[kgh941c-3, kgh941f-3]






          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Jan 17 at 15:38

























          answered Nov 16 '18 at 22:29









          Gary RussellGary Russell

          86k85179




          86k85179













          • 🙋‍♂️my question? if two topics (test1, test2) are on same cluster, the we have one container with two listener methods pointing to each topic (test1-->test1topiclistner(), test2--test2topiclistner()) since both listeners are on same container, same consumer threads will process both the topics right?

            – Deadpool
            Nov 16 '18 at 22:41











          • The number of consumer threads is controlled by the container concurrency, not the number of topics. If the concurrency is 1, all topics, all partitions will be processed on a single consumer. If the concurrency is two, and the partitions on each topic > 1, each consumer will get some partitions from each topic. As I said, we can add an option to distribute the topics across the consumers instead of Kafka distributing the partitions across consumers.

            – Gary Russell
            Nov 16 '18 at 23:33






          • 1





            By the way, if you manually assign partitions, rather than using Kafka group management, the partitions will indeed be distributed. You can manually assign partitions using the alternate constructor to ContainerProperties that takes a vararg of TopicPartitionInitialiOffsset; just set the concurrency as needed. When using @KafkaListener, this is achieved with @KafkaListener(id = "xxx", topicPartitions = {@TopicPartition(topic = "foo", partitions = "0", @TopicPartition(topic = "bar", partitions = "0"}).

            – Gary Russell
            Nov 17 '18 at 0:44













          • You can also switch to the RoundRobinAssignor - see the edit.

            – Gary Russell
            Jan 17 at 15:39



















          • 🙋‍♂️my question? if two topics (test1, test2) are on same cluster, the we have one container with two listener methods pointing to each topic (test1-->test1topiclistner(), test2--test2topiclistner()) since both listeners are on same container, same consumer threads will process both the topics right?

            – Deadpool
            Nov 16 '18 at 22:41











          • The number of consumer threads is controlled by the container concurrency, not the number of topics. If the concurrency is 1, all topics, all partitions will be processed on a single consumer. If the concurrency is two, and the partitions on each topic > 1, each consumer will get some partitions from each topic. As I said, we can add an option to distribute the topics across the consumers instead of Kafka distributing the partitions across consumers.

            – Gary Russell
            Nov 16 '18 at 23:33






          • 1





            By the way, if you manually assign partitions, rather than using Kafka group management, the partitions will indeed be distributed. You can manually assign partitions using the alternate constructor to ContainerProperties that takes a vararg of TopicPartitionInitialiOffsset; just set the concurrency as needed. When using @KafkaListener, this is achieved with @KafkaListener(id = "xxx", topicPartitions = {@TopicPartition(topic = "foo", partitions = "0", @TopicPartition(topic = "bar", partitions = "0"}).

            – Gary Russell
            Nov 17 '18 at 0:44













          • You can also switch to the RoundRobinAssignor - see the edit.

            – Gary Russell
            Jan 17 at 15:39

















          🙋‍♂️my question? if two topics (test1, test2) are on same cluster, the we have one container with two listener methods pointing to each topic (test1-->test1topiclistner(), test2--test2topiclistner()) since both listeners are on same container, same consumer threads will process both the topics right?

          – Deadpool
          Nov 16 '18 at 22:41





          🙋‍♂️my question? if two topics (test1, test2) are on same cluster, the we have one container with two listener methods pointing to each topic (test1-->test1topiclistner(), test2--test2topiclistner()) since both listeners are on same container, same consumer threads will process both the topics right?

          – Deadpool
          Nov 16 '18 at 22:41













          The number of consumer threads is controlled by the container concurrency, not the number of topics. If the concurrency is 1, all topics, all partitions will be processed on a single consumer. If the concurrency is two, and the partitions on each topic > 1, each consumer will get some partitions from each topic. As I said, we can add an option to distribute the topics across the consumers instead of Kafka distributing the partitions across consumers.

          – Gary Russell
          Nov 16 '18 at 23:33





          The number of consumer threads is controlled by the container concurrency, not the number of topics. If the concurrency is 1, all topics, all partitions will be processed on a single consumer. If the concurrency is two, and the partitions on each topic > 1, each consumer will get some partitions from each topic. As I said, we can add an option to distribute the topics across the consumers instead of Kafka distributing the partitions across consumers.

          – Gary Russell
          Nov 16 '18 at 23:33




          1




          1





          By the way, if you manually assign partitions, rather than using Kafka group management, the partitions will indeed be distributed. You can manually assign partitions using the alternate constructor to ContainerProperties that takes a vararg of TopicPartitionInitialiOffsset; just set the concurrency as needed. When using @KafkaListener, this is achieved with @KafkaListener(id = "xxx", topicPartitions = {@TopicPartition(topic = "foo", partitions = "0", @TopicPartition(topic = "bar", partitions = "0"}).

          – Gary Russell
          Nov 17 '18 at 0:44







          By the way, if you manually assign partitions, rather than using Kafka group management, the partitions will indeed be distributed. You can manually assign partitions using the alternate constructor to ContainerProperties that takes a vararg of TopicPartitionInitialiOffsset; just set the concurrency as needed. When using @KafkaListener, this is achieved with @KafkaListener(id = "xxx", topicPartitions = {@TopicPartition(topic = "foo", partitions = "0", @TopicPartition(topic = "bar", partitions = "0"}).

          – Gary Russell
          Nov 17 '18 at 0:44















          You can also switch to the RoundRobinAssignor - see the edit.

          – Gary Russell
          Jan 17 at 15:39





          You can also switch to the RoundRobinAssignor - see the edit.

          – Gary Russell
          Jan 17 at 15:39




















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53345900%2fhow-to-parallelize-kafka-message-consumption%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Bressuire

          Vorschmack

          Quarantine