Apply window function in Spark with non constant frame size












1















My Problem



I am currently facing difficulties with Spark window functions. I am using Spark (through pyspark) version 1.6.3 (associated Python version 2.6.6). I run a pyspark shell instance that automatically initializes HiveContext as my sqlContext.



I want to do a rolling sum with window function. My problem is that the window frame is not fixed: it depends on the observation we consider. To be more specific, I order data by a variable called rank_id and want to do rolling sum, for any observation indexed $x$ between indexes $x+1$ and $2x-1$. Thus, my rangeBetween must depend on the rank_id variable value.



An important point is that I don't want to collect data thus cannot use anything like numpy (my data have many many observations).



Reproducible example



from pyspark.mllib.random import RandomRDDs
import pyspark.sql.functions as psf
from pyspark.sql.window import Window

# Reproducible example
data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
df = df.selectExpr("_1 as x", "_2 as y")

#df.show(2)
#+-------------------+------------------+
#| x| y|
#+-------------------+------------------+
#|0.32767742062486405|0.2506351566289311|
#| 0.7245348534550357| 0.597929853274274|
#+-------------------+------------------+
#only showing top 2 rows

# Finalize dataframe creation
w = Window().orderBy("x")
df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
#df.show(3)
#+--------------------+--------------------+-------+
#| x| y|rank_id|
#+--------------------+--------------------+-------+
#|0.016536160706045577|0.009892450530381458| 1|
#| 0.10943843181953838| 0.6478505849227775| 2|
#| 0.13916818312857027| 0.24165348228464578| 3|
#+--------------------+--------------------+-------+
#only showing top 3 rows


Fixed width cumulative sum: no problem



Using window function, I am able to run a cumulative sum on a given number of indexes (I use here rangeBetween but for this example rowBetween could be used indifferently).



w = Window.orderBy('rank_id').rangeBetween(-1,3)
df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
#df1.show(3)
#+--------------------+--------------------+-------+------------------+
#| x| y|rank_id| roll1|
#+--------------------+--------------------+-------+------------------+
#|0.016536160706045577|0.009892450530381458| 1|0.9698521852602887|
#| 0.10943843181953838| 0.6478505849227775| 2|1.5744700156326066|
#| 0.13916818312857027| 0.24165348228464578| 3|2.3040547273760392|
#+--------------------+--------------------+-------+------------------+
#only showing top 3 rows


Cumulative sum width not fixed



I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error



# Now if I want to make rangeBetween size depend on a variable
w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)



Traceback (most recent call last):
File "", line 1, in
TypeError: cannot concatenate 'str' and 'int' objects




I tried something else, using SQL statement



# Using SQL expression
df.registerTempTable('tempdf')
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
FROM tempdf;
""")


which this times gives me the following error




Traceback (most recent call last):
File "", line 6, in
File "/opt/application/Spark/current/python/pyspark/sql/context.py", line >580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot recognize input near 'rank_id' '+' '1' in windowframeboundary; line 3 pos 15"




I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark



df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN -1 AND 1) AS cumsum
FROM tempdf;
""")



Traceback (most recent call last):
File "", line 6, in
File "/opt/application/Spark/current/python/pyspark/sql/context.py", line 580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot recognize input near '-' '1' 'AND' in windowframeboundary; line 3 pos 15"




How could I solve my problem by using either window or SQL statement within Spark?










share|improve this question





























    1















    My Problem



    I am currently facing difficulties with Spark window functions. I am using Spark (through pyspark) version 1.6.3 (associated Python version 2.6.6). I run a pyspark shell instance that automatically initializes HiveContext as my sqlContext.



    I want to do a rolling sum with window function. My problem is that the window frame is not fixed: it depends on the observation we consider. To be more specific, I order data by a variable called rank_id and want to do rolling sum, for any observation indexed $x$ between indexes $x+1$ and $2x-1$. Thus, my rangeBetween must depend on the rank_id variable value.



    An important point is that I don't want to collect data thus cannot use anything like numpy (my data have many many observations).



    Reproducible example



    from pyspark.mllib.random import RandomRDDs
    import pyspark.sql.functions as psf
    from pyspark.sql.window import Window

    # Reproducible example
    data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
    df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
    df = df.selectExpr("_1 as x", "_2 as y")

    #df.show(2)
    #+-------------------+------------------+
    #| x| y|
    #+-------------------+------------------+
    #|0.32767742062486405|0.2506351566289311|
    #| 0.7245348534550357| 0.597929853274274|
    #+-------------------+------------------+
    #only showing top 2 rows

    # Finalize dataframe creation
    w = Window().orderBy("x")
    df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
    #df.show(3)
    #+--------------------+--------------------+-------+
    #| x| y|rank_id|
    #+--------------------+--------------------+-------+
    #|0.016536160706045577|0.009892450530381458| 1|
    #| 0.10943843181953838| 0.6478505849227775| 2|
    #| 0.13916818312857027| 0.24165348228464578| 3|
    #+--------------------+--------------------+-------+
    #only showing top 3 rows


    Fixed width cumulative sum: no problem



    Using window function, I am able to run a cumulative sum on a given number of indexes (I use here rangeBetween but for this example rowBetween could be used indifferently).



    w = Window.orderBy('rank_id').rangeBetween(-1,3)
    df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
    #df1.show(3)
    #+--------------------+--------------------+-------+------------------+
    #| x| y|rank_id| roll1|
    #+--------------------+--------------------+-------+------------------+
    #|0.016536160706045577|0.009892450530381458| 1|0.9698521852602887|
    #| 0.10943843181953838| 0.6478505849227775| 2|1.5744700156326066|
    #| 0.13916818312857027| 0.24165348228464578| 3|2.3040547273760392|
    #+--------------------+--------------------+-------+------------------+
    #only showing top 3 rows


    Cumulative sum width not fixed



    I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error



    # Now if I want to make rangeBetween size depend on a variable
    w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)



    Traceback (most recent call last):
    File "", line 1, in
    TypeError: cannot concatenate 'str' and 'int' objects




    I tried something else, using SQL statement



    # Using SQL expression
    df.registerTempTable('tempdf')
    df2 = sqlContext.sql("""
    SELECT *, SUM(y)
    OVER (ORDER BY rank_id
    RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
    FROM tempdf;
    """)


    which this times gives me the following error




    Traceback (most recent call last):
    File "", line 6, in
    File "/opt/application/Spark/current/python/pyspark/sql/context.py", line >580, in sql
    return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
    File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
    File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
    pyspark.sql.utils.AnalysisException: u"cannot recognize input near 'rank_id' '+' '1' in windowframeboundary; line 3 pos 15"




    I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark



    df2 = sqlContext.sql("""
    SELECT *, SUM(y)
    OVER (ORDER BY rank_id
    RANGE BETWEEN -1 AND 1) AS cumsum
    FROM tempdf;
    """)



    Traceback (most recent call last):
    File "", line 6, in
    File "/opt/application/Spark/current/python/pyspark/sql/context.py", line 580, in sql
    return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
    File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
    File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
    pyspark.sql.utils.AnalysisException: u"cannot recognize input near '-' '1' 'AND' in windowframeboundary; line 3 pos 15"




    How could I solve my problem by using either window or SQL statement within Spark?










    share|improve this question



























      1












      1








      1








      My Problem



      I am currently facing difficulties with Spark window functions. I am using Spark (through pyspark) version 1.6.3 (associated Python version 2.6.6). I run a pyspark shell instance that automatically initializes HiveContext as my sqlContext.



      I want to do a rolling sum with window function. My problem is that the window frame is not fixed: it depends on the observation we consider. To be more specific, I order data by a variable called rank_id and want to do rolling sum, for any observation indexed $x$ between indexes $x+1$ and $2x-1$. Thus, my rangeBetween must depend on the rank_id variable value.



      An important point is that I don't want to collect data thus cannot use anything like numpy (my data have many many observations).



      Reproducible example



      from pyspark.mllib.random import RandomRDDs
      import pyspark.sql.functions as psf
      from pyspark.sql.window import Window

      # Reproducible example
      data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
      df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
      df = df.selectExpr("_1 as x", "_2 as y")

      #df.show(2)
      #+-------------------+------------------+
      #| x| y|
      #+-------------------+------------------+
      #|0.32767742062486405|0.2506351566289311|
      #| 0.7245348534550357| 0.597929853274274|
      #+-------------------+------------------+
      #only showing top 2 rows

      # Finalize dataframe creation
      w = Window().orderBy("x")
      df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
      #df.show(3)
      #+--------------------+--------------------+-------+
      #| x| y|rank_id|
      #+--------------------+--------------------+-------+
      #|0.016536160706045577|0.009892450530381458| 1|
      #| 0.10943843181953838| 0.6478505849227775| 2|
      #| 0.13916818312857027| 0.24165348228464578| 3|
      #+--------------------+--------------------+-------+
      #only showing top 3 rows


      Fixed width cumulative sum: no problem



      Using window function, I am able to run a cumulative sum on a given number of indexes (I use here rangeBetween but for this example rowBetween could be used indifferently).



      w = Window.orderBy('rank_id').rangeBetween(-1,3)
      df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
      #df1.show(3)
      #+--------------------+--------------------+-------+------------------+
      #| x| y|rank_id| roll1|
      #+--------------------+--------------------+-------+------------------+
      #|0.016536160706045577|0.009892450530381458| 1|0.9698521852602887|
      #| 0.10943843181953838| 0.6478505849227775| 2|1.5744700156326066|
      #| 0.13916818312857027| 0.24165348228464578| 3|2.3040547273760392|
      #+--------------------+--------------------+-------+------------------+
      #only showing top 3 rows


      Cumulative sum width not fixed



      I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error



      # Now if I want to make rangeBetween size depend on a variable
      w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)



      Traceback (most recent call last):
      File "", line 1, in
      TypeError: cannot concatenate 'str' and 'int' objects




      I tried something else, using SQL statement



      # Using SQL expression
      df.registerTempTable('tempdf')
      df2 = sqlContext.sql("""
      SELECT *, SUM(y)
      OVER (ORDER BY rank_id
      RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
      FROM tempdf;
      """)


      which this times gives me the following error




      Traceback (most recent call last):
      File "", line 6, in
      File "/opt/application/Spark/current/python/pyspark/sql/context.py", line >580, in sql
      return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
      File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
      File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
      raise AnalysisException(s.split(': ', 1)[1], stackTrace)
      pyspark.sql.utils.AnalysisException: u"cannot recognize input near 'rank_id' '+' '1' in windowframeboundary; line 3 pos 15"




      I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark



      df2 = sqlContext.sql("""
      SELECT *, SUM(y)
      OVER (ORDER BY rank_id
      RANGE BETWEEN -1 AND 1) AS cumsum
      FROM tempdf;
      """)



      Traceback (most recent call last):
      File "", line 6, in
      File "/opt/application/Spark/current/python/pyspark/sql/context.py", line 580, in sql
      return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
      File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
      File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
      raise AnalysisException(s.split(': ', 1)[1], stackTrace)
      pyspark.sql.utils.AnalysisException: u"cannot recognize input near '-' '1' 'AND' in windowframeboundary; line 3 pos 15"




      How could I solve my problem by using either window or SQL statement within Spark?










      share|improve this question
















      My Problem



      I am currently facing difficulties with Spark window functions. I am using Spark (through pyspark) version 1.6.3 (associated Python version 2.6.6). I run a pyspark shell instance that automatically initializes HiveContext as my sqlContext.



      I want to do a rolling sum with window function. My problem is that the window frame is not fixed: it depends on the observation we consider. To be more specific, I order data by a variable called rank_id and want to do rolling sum, for any observation indexed $x$ between indexes $x+1$ and $2x-1$. Thus, my rangeBetween must depend on the rank_id variable value.



      An important point is that I don't want to collect data thus cannot use anything like numpy (my data have many many observations).



      Reproducible example



      from pyspark.mllib.random import RandomRDDs
      import pyspark.sql.functions as psf
      from pyspark.sql.window import Window

      # Reproducible example
      data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
      df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
      df = df.selectExpr("_1 as x", "_2 as y")

      #df.show(2)
      #+-------------------+------------------+
      #| x| y|
      #+-------------------+------------------+
      #|0.32767742062486405|0.2506351566289311|
      #| 0.7245348534550357| 0.597929853274274|
      #+-------------------+------------------+
      #only showing top 2 rows

      # Finalize dataframe creation
      w = Window().orderBy("x")
      df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
      #df.show(3)
      #+--------------------+--------------------+-------+
      #| x| y|rank_id|
      #+--------------------+--------------------+-------+
      #|0.016536160706045577|0.009892450530381458| 1|
      #| 0.10943843181953838| 0.6478505849227775| 2|
      #| 0.13916818312857027| 0.24165348228464578| 3|
      #+--------------------+--------------------+-------+
      #only showing top 3 rows


      Fixed width cumulative sum: no problem



      Using window function, I am able to run a cumulative sum on a given number of indexes (I use here rangeBetween but for this example rowBetween could be used indifferently).



      w = Window.orderBy('rank_id').rangeBetween(-1,3)
      df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
      #df1.show(3)
      #+--------------------+--------------------+-------+------------------+
      #| x| y|rank_id| roll1|
      #+--------------------+--------------------+-------+------------------+
      #|0.016536160706045577|0.009892450530381458| 1|0.9698521852602887|
      #| 0.10943843181953838| 0.6478505849227775| 2|1.5744700156326066|
      #| 0.13916818312857027| 0.24165348228464578| 3|2.3040547273760392|
      #+--------------------+--------------------+-------+------------------+
      #only showing top 3 rows


      Cumulative sum width not fixed



      I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error



      # Now if I want to make rangeBetween size depend on a variable
      w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)



      Traceback (most recent call last):
      File "", line 1, in
      TypeError: cannot concatenate 'str' and 'int' objects




      I tried something else, using SQL statement



      # Using SQL expression
      df.registerTempTable('tempdf')
      df2 = sqlContext.sql("""
      SELECT *, SUM(y)
      OVER (ORDER BY rank_id
      RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
      FROM tempdf;
      """)


      which this times gives me the following error




      Traceback (most recent call last):
      File "", line 6, in
      File "/opt/application/Spark/current/python/pyspark/sql/context.py", line >580, in sql
      return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
      File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
      File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
      raise AnalysisException(s.split(': ', 1)[1], stackTrace)
      pyspark.sql.utils.AnalysisException: u"cannot recognize input near 'rank_id' '+' '1' in windowframeboundary; line 3 pos 15"




      I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark



      df2 = sqlContext.sql("""
      SELECT *, SUM(y)
      OVER (ORDER BY rank_id
      RANGE BETWEEN -1 AND 1) AS cumsum
      FROM tempdf;
      """)



      Traceback (most recent call last):
      File "", line 6, in
      File "/opt/application/Spark/current/python/pyspark/sql/context.py", line 580, in sql
      return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
      File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
      File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
      raise AnalysisException(s.split(': ', 1)[1], stackTrace)
      pyspark.sql.utils.AnalysisException: u"cannot recognize input near '-' '1' 'AND' in windowframeboundary; line 3 pos 15"




      How could I solve my problem by using either window or SQL statement within Spark?







      python apache-spark pyspark window-functions






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 10 '18 at 11:08









      hi-zir

      19.9k62864




      19.9k62864










      asked Jan 10 '18 at 10:41









      linoglinog

      82




      82
























          1 Answer
          1






          active

          oldest

          votes


















          0















          How could I solve my problem by using either window or SQL statement within Spark?




          TL;DR You cannot, or at least not in a scalable way, with current requirements. You can try something similar to sliding over RDD: How to transform data with sliding window over time series data in Pyspark




          I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark




          It is incorrect. Range specification requires (PRECEDING | FOLLOWING | CURRENT_ROW) specification. Also there should be no semicolon:



          SELECT *, SUM(x)
          OVER (ORDER BY rank_id
          RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
          FROM tempdf



          I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...




          TypeError: cannot concatenate 'str' and 'int' objects





          As exception says - you cannot call + on string and integer. You probably wanted columns:



          from pyspark.sql.functions import col

          .rangeBetween(col('rank_id') + 1, 2* col('rank_id') - 1)


          but this is not supported. Range has to be of fixed size and cannot be defined in terms of expressions.




          An important point is that I don't want to collect data




          Window definition without partitionBy:



          w = Window.orderBy('rank_id').rangeBetween(-1,3)


          is as bad as collect. So even if there are workarounds for "dynamic frame" (with conditionals and unbounded window) problem, they won't help you here.






          share|improve this answer

























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f48185535%2fapply-window-function-in-spark-with-non-constant-frame-size%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            0















            How could I solve my problem by using either window or SQL statement within Spark?




            TL;DR You cannot, or at least not in a scalable way, with current requirements. You can try something similar to sliding over RDD: How to transform data with sliding window over time series data in Pyspark




            I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark




            It is incorrect. Range specification requires (PRECEDING | FOLLOWING | CURRENT_ROW) specification. Also there should be no semicolon:



            SELECT *, SUM(x)
            OVER (ORDER BY rank_id
            RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
            FROM tempdf



            I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...




            TypeError: cannot concatenate 'str' and 'int' objects





            As exception says - you cannot call + on string and integer. You probably wanted columns:



            from pyspark.sql.functions import col

            .rangeBetween(col('rank_id') + 1, 2* col('rank_id') - 1)


            but this is not supported. Range has to be of fixed size and cannot be defined in terms of expressions.




            An important point is that I don't want to collect data




            Window definition without partitionBy:



            w = Window.orderBy('rank_id').rangeBetween(-1,3)


            is as bad as collect. So even if there are workarounds for "dynamic frame" (with conditionals and unbounded window) problem, they won't help you here.






            share|improve this answer






























              0















              How could I solve my problem by using either window or SQL statement within Spark?




              TL;DR You cannot, or at least not in a scalable way, with current requirements. You can try something similar to sliding over RDD: How to transform data with sliding window over time series data in Pyspark




              I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark




              It is incorrect. Range specification requires (PRECEDING | FOLLOWING | CURRENT_ROW) specification. Also there should be no semicolon:



              SELECT *, SUM(x)
              OVER (ORDER BY rank_id
              RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
              FROM tempdf



              I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...




              TypeError: cannot concatenate 'str' and 'int' objects





              As exception says - you cannot call + on string and integer. You probably wanted columns:



              from pyspark.sql.functions import col

              .rangeBetween(col('rank_id') + 1, 2* col('rank_id') - 1)


              but this is not supported. Range has to be of fixed size and cannot be defined in terms of expressions.




              An important point is that I don't want to collect data




              Window definition without partitionBy:



              w = Window.orderBy('rank_id').rangeBetween(-1,3)


              is as bad as collect. So even if there are workarounds for "dynamic frame" (with conditionals and unbounded window) problem, they won't help you here.






              share|improve this answer




























                0












                0








                0








                How could I solve my problem by using either window or SQL statement within Spark?




                TL;DR You cannot, or at least not in a scalable way, with current requirements. You can try something similar to sliding over RDD: How to transform data with sliding window over time series data in Pyspark




                I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark




                It is incorrect. Range specification requires (PRECEDING | FOLLOWING | CURRENT_ROW) specification. Also there should be no semicolon:



                SELECT *, SUM(x)
                OVER (ORDER BY rank_id
                RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
                FROM tempdf



                I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...




                TypeError: cannot concatenate 'str' and 'int' objects





                As exception says - you cannot call + on string and integer. You probably wanted columns:



                from pyspark.sql.functions import col

                .rangeBetween(col('rank_id') + 1, 2* col('rank_id') - 1)


                but this is not supported. Range has to be of fixed size and cannot be defined in terms of expressions.




                An important point is that I don't want to collect data




                Window definition without partitionBy:



                w = Window.orderBy('rank_id').rangeBetween(-1,3)


                is as bad as collect. So even if there are workarounds for "dynamic frame" (with conditionals and unbounded window) problem, they won't help you here.






                share|improve this answer
















                How could I solve my problem by using either window or SQL statement within Spark?




                TL;DR You cannot, or at least not in a scalable way, with current requirements. You can try something similar to sliding over RDD: How to transform data with sliding window over time series data in Pyspark




                I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark




                It is incorrect. Range specification requires (PRECEDING | FOLLOWING | CURRENT_ROW) specification. Also there should be no semicolon:



                SELECT *, SUM(x)
                OVER (ORDER BY rank_id
                RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
                FROM tempdf



                I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...




                TypeError: cannot concatenate 'str' and 'int' objects





                As exception says - you cannot call + on string and integer. You probably wanted columns:



                from pyspark.sql.functions import col

                .rangeBetween(col('rank_id') + 1, 2* col('rank_id') - 1)


                but this is not supported. Range has to be of fixed size and cannot be defined in terms of expressions.




                An important point is that I don't want to collect data




                Window definition without partitionBy:



                w = Window.orderBy('rank_id').rangeBetween(-1,3)


                is as bad as collect. So even if there are workarounds for "dynamic frame" (with conditionals and unbounded window) problem, they won't help you here.







                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Jan 10 '18 at 11:08

























                answered Jan 10 '18 at 11:03









                hi-zirhi-zir

                19.9k62864




                19.9k62864






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f48185535%2fapply-window-function-in-spark-with-non-constant-frame-size%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Xamarin.iOS Cant Deploy on Iphone

                    Glorious Revolution

                    Dulmage-Mendelsohn matrix decomposition in Python