how to use aggregateByKey on javaPairRDD in Java?












0















I have searched a lot but I didn't find examples of doing aggregateByKey in java code.



I want to find count of rows in a JavaPairRDD reducing by key.



I read that aggregateByKey is best way to do it, but i am using Java instead of scala and I am not able to use it in Java.



Please help!!!



for example:



input: [(key1,[name:abc,email:def,address:ghi]),(key1,[name:abc,email:def,address:ghi]),(key2,[name:abc,email:def,address:ghi])]

output: [(key1,[name:abc,email:def,address:ghi, count:2]),(key2,[name:abc,email:def,address:ghi, count:1])]


I want to do exactly same as in my example, I want to add an extra column to the output row having count of the rows reduced.



Thanks!!!










share|improve this question





























    0















    I have searched a lot but I didn't find examples of doing aggregateByKey in java code.



    I want to find count of rows in a JavaPairRDD reducing by key.



    I read that aggregateByKey is best way to do it, but i am using Java instead of scala and I am not able to use it in Java.



    Please help!!!



    for example:



    input: [(key1,[name:abc,email:def,address:ghi]),(key1,[name:abc,email:def,address:ghi]),(key2,[name:abc,email:def,address:ghi])]

    output: [(key1,[name:abc,email:def,address:ghi, count:2]),(key2,[name:abc,email:def,address:ghi, count:1])]


    I want to do exactly same as in my example, I want to add an extra column to the output row having count of the rows reduced.



    Thanks!!!










    share|improve this question



























      0












      0








      0








      I have searched a lot but I didn't find examples of doing aggregateByKey in java code.



      I want to find count of rows in a JavaPairRDD reducing by key.



      I read that aggregateByKey is best way to do it, but i am using Java instead of scala and I am not able to use it in Java.



      Please help!!!



      for example:



      input: [(key1,[name:abc,email:def,address:ghi]),(key1,[name:abc,email:def,address:ghi]),(key2,[name:abc,email:def,address:ghi])]

      output: [(key1,[name:abc,email:def,address:ghi, count:2]),(key2,[name:abc,email:def,address:ghi, count:1])]


      I want to do exactly same as in my example, I want to add an extra column to the output row having count of the rows reduced.



      Thanks!!!










      share|improve this question
















      I have searched a lot but I didn't find examples of doing aggregateByKey in java code.



      I want to find count of rows in a JavaPairRDD reducing by key.



      I read that aggregateByKey is best way to do it, but i am using Java instead of scala and I am not able to use it in Java.



      Please help!!!



      for example:



      input: [(key1,[name:abc,email:def,address:ghi]),(key1,[name:abc,email:def,address:ghi]),(key2,[name:abc,email:def,address:ghi])]

      output: [(key1,[name:abc,email:def,address:ghi, count:2]),(key2,[name:abc,email:def,address:ghi, count:1])]


      I want to do exactly same as in my example, I want to add an extra column to the output row having count of the rows reduced.



      Thanks!!!







      java apache-spark apache-spark-sql rdd






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Mar 15 '17 at 11:35









      Prasad Khode

      4,42693145




      4,42693145










      asked Jan 4 '16 at 18:14









      Harish PathakHarish Pathak

      9961127




      9961127
























          3 Answers
          3






          active

          oldest

          votes


















          4














          Here is the example of how I did aggregate by key in java.



          JavaPairRDD<String, Row> result = inputDataFrame.javaRDD().mapToPair(new  PairFunction<Row, String, Row>() {
          private static final long serialVersionUID = 1L;
          public Tuple2<String, Row> call(Row tblRow) throws Exception {
          String strID= CommonConstant.BLANKSTRING;
          Object newRow = new Object[schemaSize];
          for(String s: matchKey)
          {
          if(tblRow.apply(finalSchema.get(s))!=null){
          strID+= tblRow.apply(finalSchema.get(s)).toString().trim().toLowerCase();
          }
          }
          int rowSize= tblRow.length();
          for (int itr = 0; itr < rowSize; itr++)
          {
          if(tblRow.apply(itr)!=null)
          {
          newRow[itr] = tblRow.apply(itr);
          }
          }
          newRow[idIndex]= Utils.generateKey(strID);
          return new Tuple2<String, Row>(strID,RowFactory.create(newRow));
          }
          }).aggregateByKey(RowFactory.create(arr), new Function2<Row,Row,Row>(){

          private static final long serialVersionUID = 1L;

          public Row call(Row argRow1, Row argRow2) throws Exception {
          // TODO Auto-generated method stub

          Integer rowThreshold= dataSchemaHashMap.get(CommonConstant.STR_TEMPThreshold);
          Object newRow = new Object[schemaSize];
          int rowSize= argRow1.length();

          for (int itr = 0; itr < rowSize; itr++)
          {
          if(argRow1!=null && argRow2!=null)
          {
          if(argRow1.apply(itr)!=null && argRow2.apply(itr)!=null)
          {
          if(itr==rowSize-1){
          newRow[itr] = Integer.parseInt(argRow1.apply(itr).toString())+Integer.parseInt(argRow2.apply(itr).toString());
          }else{
          newRow[itr] = argRow2.apply(itr);
          }
          }
          }
          }

          return RowFactory.create(newRow);

          }

          }, new Function2<Row,Row,Row>(){
          private static final long serialVersionUID = 1L;

          public Row call(Row v1, Row v2) throws Exception {
          // TODO Auto-generated method stub
          return v1;
          }
          });

          JavaRDD<Row> result1 = result.map(new Function<Tuple2<String,Row>, Row>() {
          private static final long serialVersionUID = -5480405270683046298L;
          public Row call(Tuple2<String, Row> rddRow) throws Exception {
          return rddRow._2();
          }
          });





          share|improve this answer

































            0














            Data file:average.txt



            student_Name,subject,marks



            ss,english,80



            ss,maths,60



            GG,english,180



            PP,english,80



            PI,english,80



            GG,maths,100



            PP,maths,810



            PI,maths,800



            The problem is to find subject wise average using aggregateByKey spark transformation in java 8.



            And here is one approach:



                JavaRDD<String> baseRDD = jsc.textFile("average.txt");
            JavaPairRDD<String,Integer> studentRDD = baseRDD.mapToPair( s -> new Tuple2<String,Integer>(s.split(",")[1],Integer.parseInt(s.split(",")[2])));
            JavaPairRDD<String,Avg> avgRDD = studentRDD.aggregateByKey(new Avg(0,0), (v,x) -> new Avg(v.getSum()+x,v.getNum()+1), (v1,v2) -> new Avg(v1.getSum()+v2.getSum(),v1.getNum()+v2.getNum()));

            Map<String,Avg> mapAvg = avgRDD.collectAsMap();

            for(Entry<String,Avg> entry : mapAvg.entrySet()){
            System.out.println(entry.getKey()+"::"+entry.getValue().getAvg());
            }



            import java.io.Serializable;

            public class Avg implements Serializable{

            private static final long serialVersionUID = 1L;

            private int sum;
            private int num;

            public Avg(int sum, int num){
            this.sum = sum;
            this.num = num;
            }

            public double getAvg(){ return (this.sum / this.num);}

            public int getSum(){ return this.sum; }

            public int getNum(){ return this.num; }


            }






            share|improve this answer

































              -1














              I am not sure what you are trying to do, but i can provide a solution that gives output you needed. AggregateByKey does not do what you are expecting to do, it is just a way of combining for the RDD, where as on DataFrame it does similar to what you expect. Any way, below code can give you the output required.



              JavaPairRDD<String, Iterable<String>> groups = pairs.groupByKey();

              JavaPairRDD<Integer, String> counts = groups.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Integer, String>(){

              public Tuple2<Integer, String> call(Tuple2<String, Iterable<String>> arg0) throws Exception {
              HashMap<String, Integer> counts = new HashMap<String, Integer>();
              Iterator<String> itr = arg0._2.iterator();
              String val = null;
              while(itr.hasNext()){
              val = itr.next();
              if(counts.get(val) == null){
              counts.put(val, 1);
              }else{
              counts.put(val, counts.get(val)+1);
              }
              }

              return new Tuple2(arg0._1, counts.toString());
              }

              });


              You can try and let me know. And mind you, this is not the combining frankly, as combining does not do this kind of things.






              share|improve this answer























                Your Answer






                StackExchange.ifUsing("editor", function () {
                StackExchange.using("externalEditor", function () {
                StackExchange.using("snippets", function () {
                StackExchange.snippets.init();
                });
                });
                }, "code-snippets");

                StackExchange.ready(function() {
                var channelOptions = {
                tags: "".split(" "),
                id: "1"
                };
                initTagRenderer("".split(" "), "".split(" "), channelOptions);

                StackExchange.using("externalEditor", function() {
                // Have to fire editor after snippets, if snippets enabled
                if (StackExchange.settings.snippets.snippetsEnabled) {
                StackExchange.using("snippets", function() {
                createEditor();
                });
                }
                else {
                createEditor();
                }
                });

                function createEditor() {
                StackExchange.prepareEditor({
                heartbeatType: 'answer',
                autoActivateHeartbeat: false,
                convertImagesToLinks: true,
                noModals: true,
                showLowRepImageUploadWarning: true,
                reputationToPostImages: 10,
                bindNavPrevention: true,
                postfix: "",
                imageUploader: {
                brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
                contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
                allowUrls: true
                },
                onDemand: true,
                discardSelector: ".discard-answer"
                ,immediatelyShowMarkdownHelp:true
                });


                }
                });














                draft saved

                draft discarded


















                StackExchange.ready(
                function () {
                StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f34597255%2fhow-to-use-aggregatebykey-on-javapairrdd-in-java%23new-answer', 'question_page');
                }
                );

                Post as a guest















                Required, but never shown

























                3 Answers
                3






                active

                oldest

                votes








                3 Answers
                3






                active

                oldest

                votes









                active

                oldest

                votes






                active

                oldest

                votes









                4














                Here is the example of how I did aggregate by key in java.



                JavaPairRDD<String, Row> result = inputDataFrame.javaRDD().mapToPair(new  PairFunction<Row, String, Row>() {
                private static final long serialVersionUID = 1L;
                public Tuple2<String, Row> call(Row tblRow) throws Exception {
                String strID= CommonConstant.BLANKSTRING;
                Object newRow = new Object[schemaSize];
                for(String s: matchKey)
                {
                if(tblRow.apply(finalSchema.get(s))!=null){
                strID+= tblRow.apply(finalSchema.get(s)).toString().trim().toLowerCase();
                }
                }
                int rowSize= tblRow.length();
                for (int itr = 0; itr < rowSize; itr++)
                {
                if(tblRow.apply(itr)!=null)
                {
                newRow[itr] = tblRow.apply(itr);
                }
                }
                newRow[idIndex]= Utils.generateKey(strID);
                return new Tuple2<String, Row>(strID,RowFactory.create(newRow));
                }
                }).aggregateByKey(RowFactory.create(arr), new Function2<Row,Row,Row>(){

                private static final long serialVersionUID = 1L;

                public Row call(Row argRow1, Row argRow2) throws Exception {
                // TODO Auto-generated method stub

                Integer rowThreshold= dataSchemaHashMap.get(CommonConstant.STR_TEMPThreshold);
                Object newRow = new Object[schemaSize];
                int rowSize= argRow1.length();

                for (int itr = 0; itr < rowSize; itr++)
                {
                if(argRow1!=null && argRow2!=null)
                {
                if(argRow1.apply(itr)!=null && argRow2.apply(itr)!=null)
                {
                if(itr==rowSize-1){
                newRow[itr] = Integer.parseInt(argRow1.apply(itr).toString())+Integer.parseInt(argRow2.apply(itr).toString());
                }else{
                newRow[itr] = argRow2.apply(itr);
                }
                }
                }
                }

                return RowFactory.create(newRow);

                }

                }, new Function2<Row,Row,Row>(){
                private static final long serialVersionUID = 1L;

                public Row call(Row v1, Row v2) throws Exception {
                // TODO Auto-generated method stub
                return v1;
                }
                });

                JavaRDD<Row> result1 = result.map(new Function<Tuple2<String,Row>, Row>() {
                private static final long serialVersionUID = -5480405270683046298L;
                public Row call(Tuple2<String, Row> rddRow) throws Exception {
                return rddRow._2();
                }
                });





                share|improve this answer






























                  4














                  Here is the example of how I did aggregate by key in java.



                  JavaPairRDD<String, Row> result = inputDataFrame.javaRDD().mapToPair(new  PairFunction<Row, String, Row>() {
                  private static final long serialVersionUID = 1L;
                  public Tuple2<String, Row> call(Row tblRow) throws Exception {
                  String strID= CommonConstant.BLANKSTRING;
                  Object newRow = new Object[schemaSize];
                  for(String s: matchKey)
                  {
                  if(tblRow.apply(finalSchema.get(s))!=null){
                  strID+= tblRow.apply(finalSchema.get(s)).toString().trim().toLowerCase();
                  }
                  }
                  int rowSize= tblRow.length();
                  for (int itr = 0; itr < rowSize; itr++)
                  {
                  if(tblRow.apply(itr)!=null)
                  {
                  newRow[itr] = tblRow.apply(itr);
                  }
                  }
                  newRow[idIndex]= Utils.generateKey(strID);
                  return new Tuple2<String, Row>(strID,RowFactory.create(newRow));
                  }
                  }).aggregateByKey(RowFactory.create(arr), new Function2<Row,Row,Row>(){

                  private static final long serialVersionUID = 1L;

                  public Row call(Row argRow1, Row argRow2) throws Exception {
                  // TODO Auto-generated method stub

                  Integer rowThreshold= dataSchemaHashMap.get(CommonConstant.STR_TEMPThreshold);
                  Object newRow = new Object[schemaSize];
                  int rowSize= argRow1.length();

                  for (int itr = 0; itr < rowSize; itr++)
                  {
                  if(argRow1!=null && argRow2!=null)
                  {
                  if(argRow1.apply(itr)!=null && argRow2.apply(itr)!=null)
                  {
                  if(itr==rowSize-1){
                  newRow[itr] = Integer.parseInt(argRow1.apply(itr).toString())+Integer.parseInt(argRow2.apply(itr).toString());
                  }else{
                  newRow[itr] = argRow2.apply(itr);
                  }
                  }
                  }
                  }

                  return RowFactory.create(newRow);

                  }

                  }, new Function2<Row,Row,Row>(){
                  private static final long serialVersionUID = 1L;

                  public Row call(Row v1, Row v2) throws Exception {
                  // TODO Auto-generated method stub
                  return v1;
                  }
                  });

                  JavaRDD<Row> result1 = result.map(new Function<Tuple2<String,Row>, Row>() {
                  private static final long serialVersionUID = -5480405270683046298L;
                  public Row call(Tuple2<String, Row> rddRow) throws Exception {
                  return rddRow._2();
                  }
                  });





                  share|improve this answer




























                    4












                    4








                    4







                    Here is the example of how I did aggregate by key in java.



                    JavaPairRDD<String, Row> result = inputDataFrame.javaRDD().mapToPair(new  PairFunction<Row, String, Row>() {
                    private static final long serialVersionUID = 1L;
                    public Tuple2<String, Row> call(Row tblRow) throws Exception {
                    String strID= CommonConstant.BLANKSTRING;
                    Object newRow = new Object[schemaSize];
                    for(String s: matchKey)
                    {
                    if(tblRow.apply(finalSchema.get(s))!=null){
                    strID+= tblRow.apply(finalSchema.get(s)).toString().trim().toLowerCase();
                    }
                    }
                    int rowSize= tblRow.length();
                    for (int itr = 0; itr < rowSize; itr++)
                    {
                    if(tblRow.apply(itr)!=null)
                    {
                    newRow[itr] = tblRow.apply(itr);
                    }
                    }
                    newRow[idIndex]= Utils.generateKey(strID);
                    return new Tuple2<String, Row>(strID,RowFactory.create(newRow));
                    }
                    }).aggregateByKey(RowFactory.create(arr), new Function2<Row,Row,Row>(){

                    private static final long serialVersionUID = 1L;

                    public Row call(Row argRow1, Row argRow2) throws Exception {
                    // TODO Auto-generated method stub

                    Integer rowThreshold= dataSchemaHashMap.get(CommonConstant.STR_TEMPThreshold);
                    Object newRow = new Object[schemaSize];
                    int rowSize= argRow1.length();

                    for (int itr = 0; itr < rowSize; itr++)
                    {
                    if(argRow1!=null && argRow2!=null)
                    {
                    if(argRow1.apply(itr)!=null && argRow2.apply(itr)!=null)
                    {
                    if(itr==rowSize-1){
                    newRow[itr] = Integer.parseInt(argRow1.apply(itr).toString())+Integer.parseInt(argRow2.apply(itr).toString());
                    }else{
                    newRow[itr] = argRow2.apply(itr);
                    }
                    }
                    }
                    }

                    return RowFactory.create(newRow);

                    }

                    }, new Function2<Row,Row,Row>(){
                    private static final long serialVersionUID = 1L;

                    public Row call(Row v1, Row v2) throws Exception {
                    // TODO Auto-generated method stub
                    return v1;
                    }
                    });

                    JavaRDD<Row> result1 = result.map(new Function<Tuple2<String,Row>, Row>() {
                    private static final long serialVersionUID = -5480405270683046298L;
                    public Row call(Tuple2<String, Row> rddRow) throws Exception {
                    return rddRow._2();
                    }
                    });





                    share|improve this answer















                    Here is the example of how I did aggregate by key in java.



                    JavaPairRDD<String, Row> result = inputDataFrame.javaRDD().mapToPair(new  PairFunction<Row, String, Row>() {
                    private static final long serialVersionUID = 1L;
                    public Tuple2<String, Row> call(Row tblRow) throws Exception {
                    String strID= CommonConstant.BLANKSTRING;
                    Object newRow = new Object[schemaSize];
                    for(String s: matchKey)
                    {
                    if(tblRow.apply(finalSchema.get(s))!=null){
                    strID+= tblRow.apply(finalSchema.get(s)).toString().trim().toLowerCase();
                    }
                    }
                    int rowSize= tblRow.length();
                    for (int itr = 0; itr < rowSize; itr++)
                    {
                    if(tblRow.apply(itr)!=null)
                    {
                    newRow[itr] = tblRow.apply(itr);
                    }
                    }
                    newRow[idIndex]= Utils.generateKey(strID);
                    return new Tuple2<String, Row>(strID,RowFactory.create(newRow));
                    }
                    }).aggregateByKey(RowFactory.create(arr), new Function2<Row,Row,Row>(){

                    private static final long serialVersionUID = 1L;

                    public Row call(Row argRow1, Row argRow2) throws Exception {
                    // TODO Auto-generated method stub

                    Integer rowThreshold= dataSchemaHashMap.get(CommonConstant.STR_TEMPThreshold);
                    Object newRow = new Object[schemaSize];
                    int rowSize= argRow1.length();

                    for (int itr = 0; itr < rowSize; itr++)
                    {
                    if(argRow1!=null && argRow2!=null)
                    {
                    if(argRow1.apply(itr)!=null && argRow2.apply(itr)!=null)
                    {
                    if(itr==rowSize-1){
                    newRow[itr] = Integer.parseInt(argRow1.apply(itr).toString())+Integer.parseInt(argRow2.apply(itr).toString());
                    }else{
                    newRow[itr] = argRow2.apply(itr);
                    }
                    }
                    }
                    }

                    return RowFactory.create(newRow);

                    }

                    }, new Function2<Row,Row,Row>(){
                    private static final long serialVersionUID = 1L;

                    public Row call(Row v1, Row v2) throws Exception {
                    // TODO Auto-generated method stub
                    return v1;
                    }
                    });

                    JavaRDD<Row> result1 = result.map(new Function<Tuple2<String,Row>, Row>() {
                    private static final long serialVersionUID = -5480405270683046298L;
                    public Row call(Tuple2<String, Row> rddRow) throws Exception {
                    return rddRow._2();
                    }
                    });






                    share|improve this answer














                    share|improve this answer



                    share|improve this answer








                    edited Mar 15 '17 at 11:35









                    Prasad Khode

                    4,42693145




                    4,42693145










                    answered Jan 5 '16 at 13:29









                    Harish PathakHarish Pathak

                    9961127




                    9961127

























                        0














                        Data file:average.txt



                        student_Name,subject,marks



                        ss,english,80



                        ss,maths,60



                        GG,english,180



                        PP,english,80



                        PI,english,80



                        GG,maths,100



                        PP,maths,810



                        PI,maths,800



                        The problem is to find subject wise average using aggregateByKey spark transformation in java 8.



                        And here is one approach:



                            JavaRDD<String> baseRDD = jsc.textFile("average.txt");
                        JavaPairRDD<String,Integer> studentRDD = baseRDD.mapToPair( s -> new Tuple2<String,Integer>(s.split(",")[1],Integer.parseInt(s.split(",")[2])));
                        JavaPairRDD<String,Avg> avgRDD = studentRDD.aggregateByKey(new Avg(0,0), (v,x) -> new Avg(v.getSum()+x,v.getNum()+1), (v1,v2) -> new Avg(v1.getSum()+v2.getSum(),v1.getNum()+v2.getNum()));

                        Map<String,Avg> mapAvg = avgRDD.collectAsMap();

                        for(Entry<String,Avg> entry : mapAvg.entrySet()){
                        System.out.println(entry.getKey()+"::"+entry.getValue().getAvg());
                        }



                        import java.io.Serializable;

                        public class Avg implements Serializable{

                        private static final long serialVersionUID = 1L;

                        private int sum;
                        private int num;

                        public Avg(int sum, int num){
                        this.sum = sum;
                        this.num = num;
                        }

                        public double getAvg(){ return (this.sum / this.num);}

                        public int getSum(){ return this.sum; }

                        public int getNum(){ return this.num; }


                        }






                        share|improve this answer






























                          0














                          Data file:average.txt



                          student_Name,subject,marks



                          ss,english,80



                          ss,maths,60



                          GG,english,180



                          PP,english,80



                          PI,english,80



                          GG,maths,100



                          PP,maths,810



                          PI,maths,800



                          The problem is to find subject wise average using aggregateByKey spark transformation in java 8.



                          And here is one approach:



                              JavaRDD<String> baseRDD = jsc.textFile("average.txt");
                          JavaPairRDD<String,Integer> studentRDD = baseRDD.mapToPair( s -> new Tuple2<String,Integer>(s.split(",")[1],Integer.parseInt(s.split(",")[2])));
                          JavaPairRDD<String,Avg> avgRDD = studentRDD.aggregateByKey(new Avg(0,0), (v,x) -> new Avg(v.getSum()+x,v.getNum()+1), (v1,v2) -> new Avg(v1.getSum()+v2.getSum(),v1.getNum()+v2.getNum()));

                          Map<String,Avg> mapAvg = avgRDD.collectAsMap();

                          for(Entry<String,Avg> entry : mapAvg.entrySet()){
                          System.out.println(entry.getKey()+"::"+entry.getValue().getAvg());
                          }



                          import java.io.Serializable;

                          public class Avg implements Serializable{

                          private static final long serialVersionUID = 1L;

                          private int sum;
                          private int num;

                          public Avg(int sum, int num){
                          this.sum = sum;
                          this.num = num;
                          }

                          public double getAvg(){ return (this.sum / this.num);}

                          public int getSum(){ return this.sum; }

                          public int getNum(){ return this.num; }


                          }






                          share|improve this answer




























                            0












                            0








                            0







                            Data file:average.txt



                            student_Name,subject,marks



                            ss,english,80



                            ss,maths,60



                            GG,english,180



                            PP,english,80



                            PI,english,80



                            GG,maths,100



                            PP,maths,810



                            PI,maths,800



                            The problem is to find subject wise average using aggregateByKey spark transformation in java 8.



                            And here is one approach:



                                JavaRDD<String> baseRDD = jsc.textFile("average.txt");
                            JavaPairRDD<String,Integer> studentRDD = baseRDD.mapToPair( s -> new Tuple2<String,Integer>(s.split(",")[1],Integer.parseInt(s.split(",")[2])));
                            JavaPairRDD<String,Avg> avgRDD = studentRDD.aggregateByKey(new Avg(0,0), (v,x) -> new Avg(v.getSum()+x,v.getNum()+1), (v1,v2) -> new Avg(v1.getSum()+v2.getSum(),v1.getNum()+v2.getNum()));

                            Map<String,Avg> mapAvg = avgRDD.collectAsMap();

                            for(Entry<String,Avg> entry : mapAvg.entrySet()){
                            System.out.println(entry.getKey()+"::"+entry.getValue().getAvg());
                            }



                            import java.io.Serializable;

                            public class Avg implements Serializable{

                            private static final long serialVersionUID = 1L;

                            private int sum;
                            private int num;

                            public Avg(int sum, int num){
                            this.sum = sum;
                            this.num = num;
                            }

                            public double getAvg(){ return (this.sum / this.num);}

                            public int getSum(){ return this.sum; }

                            public int getNum(){ return this.num; }


                            }






                            share|improve this answer















                            Data file:average.txt



                            student_Name,subject,marks



                            ss,english,80



                            ss,maths,60



                            GG,english,180



                            PP,english,80



                            PI,english,80



                            GG,maths,100



                            PP,maths,810



                            PI,maths,800



                            The problem is to find subject wise average using aggregateByKey spark transformation in java 8.



                            And here is one approach:



                                JavaRDD<String> baseRDD = jsc.textFile("average.txt");
                            JavaPairRDD<String,Integer> studentRDD = baseRDD.mapToPair( s -> new Tuple2<String,Integer>(s.split(",")[1],Integer.parseInt(s.split(",")[2])));
                            JavaPairRDD<String,Avg> avgRDD = studentRDD.aggregateByKey(new Avg(0,0), (v,x) -> new Avg(v.getSum()+x,v.getNum()+1), (v1,v2) -> new Avg(v1.getSum()+v2.getSum(),v1.getNum()+v2.getNum()));

                            Map<String,Avg> mapAvg = avgRDD.collectAsMap();

                            for(Entry<String,Avg> entry : mapAvg.entrySet()){
                            System.out.println(entry.getKey()+"::"+entry.getValue().getAvg());
                            }



                            import java.io.Serializable;

                            public class Avg implements Serializable{

                            private static final long serialVersionUID = 1L;

                            private int sum;
                            private int num;

                            public Avg(int sum, int num){
                            this.sum = sum;
                            this.num = num;
                            }

                            public double getAvg(){ return (this.sum / this.num);}

                            public int getSum(){ return this.sum; }

                            public int getNum(){ return this.num; }


                            }







                            share|improve this answer














                            share|improve this answer



                            share|improve this answer








                            edited Nov 15 '18 at 18:35









                            Theresa

                            1,96193439




                            1,96193439










                            answered Nov 15 '18 at 17:06









                            ShekharShekhar

                            42




                            42























                                -1














                                I am not sure what you are trying to do, but i can provide a solution that gives output you needed. AggregateByKey does not do what you are expecting to do, it is just a way of combining for the RDD, where as on DataFrame it does similar to what you expect. Any way, below code can give you the output required.



                                JavaPairRDD<String, Iterable<String>> groups = pairs.groupByKey();

                                JavaPairRDD<Integer, String> counts = groups.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Integer, String>(){

                                public Tuple2<Integer, String> call(Tuple2<String, Iterable<String>> arg0) throws Exception {
                                HashMap<String, Integer> counts = new HashMap<String, Integer>();
                                Iterator<String> itr = arg0._2.iterator();
                                String val = null;
                                while(itr.hasNext()){
                                val = itr.next();
                                if(counts.get(val) == null){
                                counts.put(val, 1);
                                }else{
                                counts.put(val, counts.get(val)+1);
                                }
                                }

                                return new Tuple2(arg0._1, counts.toString());
                                }

                                });


                                You can try and let me know. And mind you, this is not the combining frankly, as combining does not do this kind of things.






                                share|improve this answer




























                                  -1














                                  I am not sure what you are trying to do, but i can provide a solution that gives output you needed. AggregateByKey does not do what you are expecting to do, it is just a way of combining for the RDD, where as on DataFrame it does similar to what you expect. Any way, below code can give you the output required.



                                  JavaPairRDD<String, Iterable<String>> groups = pairs.groupByKey();

                                  JavaPairRDD<Integer, String> counts = groups.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Integer, String>(){

                                  public Tuple2<Integer, String> call(Tuple2<String, Iterable<String>> arg0) throws Exception {
                                  HashMap<String, Integer> counts = new HashMap<String, Integer>();
                                  Iterator<String> itr = arg0._2.iterator();
                                  String val = null;
                                  while(itr.hasNext()){
                                  val = itr.next();
                                  if(counts.get(val) == null){
                                  counts.put(val, 1);
                                  }else{
                                  counts.put(val, counts.get(val)+1);
                                  }
                                  }

                                  return new Tuple2(arg0._1, counts.toString());
                                  }

                                  });


                                  You can try and let me know. And mind you, this is not the combining frankly, as combining does not do this kind of things.






                                  share|improve this answer


























                                    -1












                                    -1








                                    -1







                                    I am not sure what you are trying to do, but i can provide a solution that gives output you needed. AggregateByKey does not do what you are expecting to do, it is just a way of combining for the RDD, where as on DataFrame it does similar to what you expect. Any way, below code can give you the output required.



                                    JavaPairRDD<String, Iterable<String>> groups = pairs.groupByKey();

                                    JavaPairRDD<Integer, String> counts = groups.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Integer, String>(){

                                    public Tuple2<Integer, String> call(Tuple2<String, Iterable<String>> arg0) throws Exception {
                                    HashMap<String, Integer> counts = new HashMap<String, Integer>();
                                    Iterator<String> itr = arg0._2.iterator();
                                    String val = null;
                                    while(itr.hasNext()){
                                    val = itr.next();
                                    if(counts.get(val) == null){
                                    counts.put(val, 1);
                                    }else{
                                    counts.put(val, counts.get(val)+1);
                                    }
                                    }

                                    return new Tuple2(arg0._1, counts.toString());
                                    }

                                    });


                                    You can try and let me know. And mind you, this is not the combining frankly, as combining does not do this kind of things.






                                    share|improve this answer













                                    I am not sure what you are trying to do, but i can provide a solution that gives output you needed. AggregateByKey does not do what you are expecting to do, it is just a way of combining for the RDD, where as on DataFrame it does similar to what you expect. Any way, below code can give you the output required.



                                    JavaPairRDD<String, Iterable<String>> groups = pairs.groupByKey();

                                    JavaPairRDD<Integer, String> counts = groups.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Integer, String>(){

                                    public Tuple2<Integer, String> call(Tuple2<String, Iterable<String>> arg0) throws Exception {
                                    HashMap<String, Integer> counts = new HashMap<String, Integer>();
                                    Iterator<String> itr = arg0._2.iterator();
                                    String val = null;
                                    while(itr.hasNext()){
                                    val = itr.next();
                                    if(counts.get(val) == null){
                                    counts.put(val, 1);
                                    }else{
                                    counts.put(val, counts.get(val)+1);
                                    }
                                    }

                                    return new Tuple2(arg0._1, counts.toString());
                                    }

                                    });


                                    You can try and let me know. And mind you, this is not the combining frankly, as combining does not do this kind of things.







                                    share|improve this answer












                                    share|improve this answer



                                    share|improve this answer










                                    answered Jan 4 '16 at 19:42









                                    SriniSrini

                                    1,82331941




                                    1,82331941






























                                        draft saved

                                        draft discarded




















































                                        Thanks for contributing an answer to Stack Overflow!


                                        • Please be sure to answer the question. Provide details and share your research!

                                        But avoid



                                        • Asking for help, clarification, or responding to other answers.

                                        • Making statements based on opinion; back them up with references or personal experience.


                                        To learn more, see our tips on writing great answers.




                                        draft saved


                                        draft discarded














                                        StackExchange.ready(
                                        function () {
                                        StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f34597255%2fhow-to-use-aggregatebykey-on-javapairrdd-in-java%23new-answer', 'question_page');
                                        }
                                        );

                                        Post as a guest















                                        Required, but never shown





















































                                        Required, but never shown














                                        Required, but never shown












                                        Required, but never shown







                                        Required, but never shown

































                                        Required, but never shown














                                        Required, but never shown












                                        Required, but never shown







                                        Required, but never shown







                                        Popular posts from this blog

                                        Bressuire

                                        Vorschmack

                                        Quarantine