Assign memory for individual worker in storm












1















I need to assign different values of memory for each new worker. So I tried changing memory for each bolt and spout. I am currently using a custom scheduler also. Here is my approach to the problem.



MY CODE:



TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new EmailSpout(), 1).addConfiguration("node", "zoo1").setMemoryLoad(512.0);
builder.setBolt("increment1", new IncrementBolt(), PARALLELISM).shuffleGrouping("spout").addConfiguration("node", "zoo2").setMemoryLoad(2048.0);
builder.setBolt("increment2", new IncrementBolt(), PARALLELISM).shuffleGrouping("increment1").addConfiguration("node", "zoo3").setMemoryLoad(2048.0);
builder.setBolt("increment3", new IncrementBolt(), PARALLELISM).shuffleGrouping("increment2").addConfiguration("node", "zoo4").setMemoryLoad(2048.0);
builder.setBolt("output", new OutputBolt(), 1).globalGrouping("increment2").addConfiguration("node", "zoo1").setMemoryLoad(512.0);
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(4);
StormSubmitter.submitTopologyWithProgressBar("Microbenchmark", conf, builder.createTopology());


MY STORM.YAML:



 storm.zookeeper.servers:
- "zoo1"
storm.zookeeper.port: 2181
nimbus.seeds: ["zoo1"]
storm.local.dir: "/home/ubuntu/eranga/storm-data"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- 6704
storm.scheduler: "org.apache.storm.scheduler.NodeBasedCustomScheduler"
supervisor.scheduler.meta:
node: "zoo4"
worker.profiler.enabled: true
worker.profiler.childopts: "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder"
worker.profiler.command: "flight.bash"
worker.heartbeat.frequency.secs: 1
worker.childopts: "-Xmx2048m -Xms2048m -Djava.net.preferIPv4Stack=true -Dorg.xml.sax.driver=com.sun.org.apache.xerces.internal.parsers.SAXParser -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl"


When I submit the topology I get the following error.



ERROR:




Exception in thread "main" java.lang.IllegalArgumentException: Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=768.0 < 2048.0 (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount



at org.apache.storm.StormSubmitter.validateTopologyWorkerMaxHeapSizeMBConfigs(StormSubmitter.java:496)




Any suggestions?










share|improve this question





























    1















    I need to assign different values of memory for each new worker. So I tried changing memory for each bolt and spout. I am currently using a custom scheduler also. Here is my approach to the problem.



    MY CODE:



    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new EmailSpout(), 1).addConfiguration("node", "zoo1").setMemoryLoad(512.0);
    builder.setBolt("increment1", new IncrementBolt(), PARALLELISM).shuffleGrouping("spout").addConfiguration("node", "zoo2").setMemoryLoad(2048.0);
    builder.setBolt("increment2", new IncrementBolt(), PARALLELISM).shuffleGrouping("increment1").addConfiguration("node", "zoo3").setMemoryLoad(2048.0);
    builder.setBolt("increment3", new IncrementBolt(), PARALLELISM).shuffleGrouping("increment2").addConfiguration("node", "zoo4").setMemoryLoad(2048.0);
    builder.setBolt("output", new OutputBolt(), 1).globalGrouping("increment2").addConfiguration("node", "zoo1").setMemoryLoad(512.0);
    Config conf = new Config();
    conf.setDebug(false);
    conf.setNumWorkers(4);
    StormSubmitter.submitTopologyWithProgressBar("Microbenchmark", conf, builder.createTopology());


    MY STORM.YAML:



     storm.zookeeper.servers:
    - "zoo1"
    storm.zookeeper.port: 2181
    nimbus.seeds: ["zoo1"]
    storm.local.dir: "/home/ubuntu/eranga/storm-data"
    supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
    - 6704
    storm.scheduler: "org.apache.storm.scheduler.NodeBasedCustomScheduler"
    supervisor.scheduler.meta:
    node: "zoo4"
    worker.profiler.enabled: true
    worker.profiler.childopts: "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder"
    worker.profiler.command: "flight.bash"
    worker.heartbeat.frequency.secs: 1
    worker.childopts: "-Xmx2048m -Xms2048m -Djava.net.preferIPv4Stack=true -Dorg.xml.sax.driver=com.sun.org.apache.xerces.internal.parsers.SAXParser -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl"


    When I submit the topology I get the following error.



    ERROR:




    Exception in thread "main" java.lang.IllegalArgumentException: Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=768.0 < 2048.0 (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount



    at org.apache.storm.StormSubmitter.validateTopologyWorkerMaxHeapSizeMBConfigs(StormSubmitter.java:496)




    Any suggestions?










    share|improve this question



























      1












      1








      1








      I need to assign different values of memory for each new worker. So I tried changing memory for each bolt and spout. I am currently using a custom scheduler also. Here is my approach to the problem.



      MY CODE:



      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("spout", new EmailSpout(), 1).addConfiguration("node", "zoo1").setMemoryLoad(512.0);
      builder.setBolt("increment1", new IncrementBolt(), PARALLELISM).shuffleGrouping("spout").addConfiguration("node", "zoo2").setMemoryLoad(2048.0);
      builder.setBolt("increment2", new IncrementBolt(), PARALLELISM).shuffleGrouping("increment1").addConfiguration("node", "zoo3").setMemoryLoad(2048.0);
      builder.setBolt("increment3", new IncrementBolt(), PARALLELISM).shuffleGrouping("increment2").addConfiguration("node", "zoo4").setMemoryLoad(2048.0);
      builder.setBolt("output", new OutputBolt(), 1).globalGrouping("increment2").addConfiguration("node", "zoo1").setMemoryLoad(512.0);
      Config conf = new Config();
      conf.setDebug(false);
      conf.setNumWorkers(4);
      StormSubmitter.submitTopologyWithProgressBar("Microbenchmark", conf, builder.createTopology());


      MY STORM.YAML:



       storm.zookeeper.servers:
      - "zoo1"
      storm.zookeeper.port: 2181
      nimbus.seeds: ["zoo1"]
      storm.local.dir: "/home/ubuntu/eranga/storm-data"
      supervisor.slots.ports:
      - 6700
      - 6701
      - 6702
      - 6703
      - 6704
      storm.scheduler: "org.apache.storm.scheduler.NodeBasedCustomScheduler"
      supervisor.scheduler.meta:
      node: "zoo4"
      worker.profiler.enabled: true
      worker.profiler.childopts: "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder"
      worker.profiler.command: "flight.bash"
      worker.heartbeat.frequency.secs: 1
      worker.childopts: "-Xmx2048m -Xms2048m -Djava.net.preferIPv4Stack=true -Dorg.xml.sax.driver=com.sun.org.apache.xerces.internal.parsers.SAXParser -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl"


      When I submit the topology I get the following error.



      ERROR:




      Exception in thread "main" java.lang.IllegalArgumentException: Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=768.0 < 2048.0 (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount



      at org.apache.storm.StormSubmitter.validateTopologyWorkerMaxHeapSizeMBConfigs(StormSubmitter.java:496)




      Any suggestions?










      share|improve this question
















      I need to assign different values of memory for each new worker. So I tried changing memory for each bolt and spout. I am currently using a custom scheduler also. Here is my approach to the problem.



      MY CODE:



      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("spout", new EmailSpout(), 1).addConfiguration("node", "zoo1").setMemoryLoad(512.0);
      builder.setBolt("increment1", new IncrementBolt(), PARALLELISM).shuffleGrouping("spout").addConfiguration("node", "zoo2").setMemoryLoad(2048.0);
      builder.setBolt("increment2", new IncrementBolt(), PARALLELISM).shuffleGrouping("increment1").addConfiguration("node", "zoo3").setMemoryLoad(2048.0);
      builder.setBolt("increment3", new IncrementBolt(), PARALLELISM).shuffleGrouping("increment2").addConfiguration("node", "zoo4").setMemoryLoad(2048.0);
      builder.setBolt("output", new OutputBolt(), 1).globalGrouping("increment2").addConfiguration("node", "zoo1").setMemoryLoad(512.0);
      Config conf = new Config();
      conf.setDebug(false);
      conf.setNumWorkers(4);
      StormSubmitter.submitTopologyWithProgressBar("Microbenchmark", conf, builder.createTopology());


      MY STORM.YAML:



       storm.zookeeper.servers:
      - "zoo1"
      storm.zookeeper.port: 2181
      nimbus.seeds: ["zoo1"]
      storm.local.dir: "/home/ubuntu/eranga/storm-data"
      supervisor.slots.ports:
      - 6700
      - 6701
      - 6702
      - 6703
      - 6704
      storm.scheduler: "org.apache.storm.scheduler.NodeBasedCustomScheduler"
      supervisor.scheduler.meta:
      node: "zoo4"
      worker.profiler.enabled: true
      worker.profiler.childopts: "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder"
      worker.profiler.command: "flight.bash"
      worker.heartbeat.frequency.secs: 1
      worker.childopts: "-Xmx2048m -Xms2048m -Djava.net.preferIPv4Stack=true -Dorg.xml.sax.driver=com.sun.org.apache.xerces.internal.parsers.SAXParser -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl"


      When I submit the topology I get the following error.



      ERROR:




      Exception in thread "main" java.lang.IllegalArgumentException: Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=768.0 < 2048.0 (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount



      at org.apache.storm.StormSubmitter.validateTopologyWorkerMaxHeapSizeMBConfigs(StormSubmitter.java:496)




      Any suggestions?







      java configuration apache-storm topology






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 13 '18 at 18:14









      Saurabh

      28.9k1699165




      28.9k1699165










      asked Dec 2 '16 at 6:26









      Eranga HeshanEranga Heshan

      2221317




      2221317
























          2 Answers
          2






          active

          oldest

          votes


















          0














          Did you try following the advice from the error message?




          Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount




          Try adding this to storm.yaml:



          topology.worker.max.heap.size.mb=2048.0 





          share|improve this answer































            0














            Try using this.



            import org.apache.storm.Config;

            public class TopologyExecuter{
            for(List<StormTopology> StormTopologyObject : StormTopologyObjects ){
            Config topologyConf = new Config();
            topologyConf.put(Config.TOPOLOGY_WORKER_CHILDOPTS,"-Xmx512m -Xms256m");
            StormSubmitter.submitTopology("topology name", topologyConf, StormTopologyObject);
            }

            }





            share|improve this answer























              Your Answer






              StackExchange.ifUsing("editor", function () {
              StackExchange.using("externalEditor", function () {
              StackExchange.using("snippets", function () {
              StackExchange.snippets.init();
              });
              });
              }, "code-snippets");

              StackExchange.ready(function() {
              var channelOptions = {
              tags: "".split(" "),
              id: "1"
              };
              initTagRenderer("".split(" "), "".split(" "), channelOptions);

              StackExchange.using("externalEditor", function() {
              // Have to fire editor after snippets, if snippets enabled
              if (StackExchange.settings.snippets.snippetsEnabled) {
              StackExchange.using("snippets", function() {
              createEditor();
              });
              }
              else {
              createEditor();
              }
              });

              function createEditor() {
              StackExchange.prepareEditor({
              heartbeatType: 'answer',
              autoActivateHeartbeat: false,
              convertImagesToLinks: true,
              noModals: true,
              showLowRepImageUploadWarning: true,
              reputationToPostImages: 10,
              bindNavPrevention: true,
              postfix: "",
              imageUploader: {
              brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
              contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
              allowUrls: true
              },
              onDemand: true,
              discardSelector: ".discard-answer"
              ,immediatelyShowMarkdownHelp:true
              });


              }
              });














              draft saved

              draft discarded


















              StackExchange.ready(
              function () {
              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f40926092%2fassign-memory-for-individual-worker-in-storm%23new-answer', 'question_page');
              }
              );

              Post as a guest















              Required, but never shown

























              2 Answers
              2






              active

              oldest

              votes








              2 Answers
              2






              active

              oldest

              votes









              active

              oldest

              votes






              active

              oldest

              votes









              0














              Did you try following the advice from the error message?




              Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount




              Try adding this to storm.yaml:



              topology.worker.max.heap.size.mb=2048.0 





              share|improve this answer




























                0














                Did you try following the advice from the error message?




                Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount




                Try adding this to storm.yaml:



                topology.worker.max.heap.size.mb=2048.0 





                share|improve this answer


























                  0












                  0








                  0







                  Did you try following the advice from the error message?




                  Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount




                  Try adding this to storm.yaml:



                  topology.worker.max.heap.size.mb=2048.0 





                  share|improve this answer













                  Did you try following the advice from the error message?




                  Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount




                  Try adding this to storm.yaml:



                  topology.worker.max.heap.size.mb=2048.0 






                  share|improve this answer












                  share|improve this answer



                  share|improve this answer










                  answered Dec 9 '16 at 3:52









                  Kit MenkeKit Menke

                  6,38612750




                  6,38612750

























                      0














                      Try using this.



                      import org.apache.storm.Config;

                      public class TopologyExecuter{
                      for(List<StormTopology> StormTopologyObject : StormTopologyObjects ){
                      Config topologyConf = new Config();
                      topologyConf.put(Config.TOPOLOGY_WORKER_CHILDOPTS,"-Xmx512m -Xms256m");
                      StormSubmitter.submitTopology("topology name", topologyConf, StormTopologyObject);
                      }

                      }





                      share|improve this answer




























                        0














                        Try using this.



                        import org.apache.storm.Config;

                        public class TopologyExecuter{
                        for(List<StormTopology> StormTopologyObject : StormTopologyObjects ){
                        Config topologyConf = new Config();
                        topologyConf.put(Config.TOPOLOGY_WORKER_CHILDOPTS,"-Xmx512m -Xms256m");
                        StormSubmitter.submitTopology("topology name", topologyConf, StormTopologyObject);
                        }

                        }





                        share|improve this answer


























                          0












                          0








                          0







                          Try using this.



                          import org.apache.storm.Config;

                          public class TopologyExecuter{
                          for(List<StormTopology> StormTopologyObject : StormTopologyObjects ){
                          Config topologyConf = new Config();
                          topologyConf.put(Config.TOPOLOGY_WORKER_CHILDOPTS,"-Xmx512m -Xms256m");
                          StormSubmitter.submitTopology("topology name", topologyConf, StormTopologyObject);
                          }

                          }





                          share|improve this answer













                          Try using this.



                          import org.apache.storm.Config;

                          public class TopologyExecuter{
                          for(List<StormTopology> StormTopologyObject : StormTopologyObjects ){
                          Config topologyConf = new Config();
                          topologyConf.put(Config.TOPOLOGY_WORKER_CHILDOPTS,"-Xmx512m -Xms256m");
                          StormSubmitter.submitTopology("topology name", topologyConf, StormTopologyObject);
                          }

                          }






                          share|improve this answer












                          share|improve this answer



                          share|improve this answer










                          answered Nov 9 '17 at 8:13









                          BalamuruganBalamurugan

                          566




                          566






























                              draft saved

                              draft discarded




















































                              Thanks for contributing an answer to Stack Overflow!


                              • Please be sure to answer the question. Provide details and share your research!

                              But avoid



                              • Asking for help, clarification, or responding to other answers.

                              • Making statements based on opinion; back them up with references or personal experience.


                              To learn more, see our tips on writing great answers.




                              draft saved


                              draft discarded














                              StackExchange.ready(
                              function () {
                              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f40926092%2fassign-memory-for-individual-worker-in-storm%23new-answer', 'question_page');
                              }
                              );

                              Post as a guest















                              Required, but never shown





















































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown

































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown