Apply window function in Spark with non constant frame size
My Problem
I am currently facing difficulties with Spark window functions. I am using Spark (through pyspark) version 1.6.3
(associated Python version 2.6.6
). I run a pyspark shell instance that automatically initializes HiveContext
as my sqlContext
.
I want to do a rolling sum with window
function. My problem is that the window frame is not fixed: it depends on the observation we consider. To be more specific, I order data by a variable called rank_id
and want to do rolling sum, for any observation indexed $x$ between indexes $x+1$ and $2x-1$. Thus, my rangeBetween
must depend on the rank_id
variable value.
An important point is that I don't want to collect data thus cannot use anything like numpy
(my data have many many observations).
Reproducible example
from pyspark.mllib.random import RandomRDDs
import pyspark.sql.functions as psf
from pyspark.sql.window import Window
# Reproducible example
data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
df = df.selectExpr("_1 as x", "_2 as y")
#df.show(2)
#+-------------------+------------------+
#| x| y|
#+-------------------+------------------+
#|0.32767742062486405|0.2506351566289311|
#| 0.7245348534550357| 0.597929853274274|
#+-------------------+------------------+
#only showing top 2 rows
# Finalize dataframe creation
w = Window().orderBy("x")
df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
#df.show(3)
#+--------------------+--------------------+-------+
#| x| y|rank_id|
#+--------------------+--------------------+-------+
#|0.016536160706045577|0.009892450530381458| 1|
#| 0.10943843181953838| 0.6478505849227775| 2|
#| 0.13916818312857027| 0.24165348228464578| 3|
#+--------------------+--------------------+-------+
#only showing top 3 rows
Fixed width cumulative sum: no problem
Using window
function, I am able to run a cumulative sum on a given number of indexes (I use here rangeBetween
but for this example rowBetween
could be used indifferently).
w = Window.orderBy('rank_id').rangeBetween(-1,3)
df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
#df1.show(3)
#+--------------------+--------------------+-------+------------------+
#| x| y|rank_id| roll1|
#+--------------------+--------------------+-------+------------------+
#|0.016536160706045577|0.009892450530381458| 1|0.9698521852602887|
#| 0.10943843181953838| 0.6478505849227775| 2|1.5744700156326066|
#| 0.13916818312857027| 0.24165348228464578| 3|2.3040547273760392|
#+--------------------+--------------------+-------+------------------+
#only showing top 3 rows
Cumulative sum width not fixed
I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy
maybe that's the problem), I got the following error
# Now if I want to make rangeBetween size depend on a variable
w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)
Traceback (most recent call last):
File "", line 1, in
TypeError: cannot concatenate 'str' and 'int' objects
I tried something else, using SQL statement
# Using SQL expression
df.registerTempTable('tempdf')
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
FROM tempdf;
""")
which this times gives me the following error
Traceback (most recent call last):
File "", line 6, in
File "/opt/application/Spark/current/python/pyspark/sql/context.py", line >580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot recognize input near 'rank_id' '+' '1' in windowframeboundary; line 3 pos 15"
I also noticed that when I try a more simple statement using SQL OVER
clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN -1 AND 1) AS cumsum
FROM tempdf;
""")
Traceback (most recent call last):
File "", line 6, in
File "/opt/application/Spark/current/python/pyspark/sql/context.py", line 580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot recognize input near '-' '1' 'AND' in windowframeboundary; line 3 pos 15"
How could I solve my problem by using either window
or SQL
statement within Spark?
python apache-spark pyspark window-functions
add a comment |
My Problem
I am currently facing difficulties with Spark window functions. I am using Spark (through pyspark) version 1.6.3
(associated Python version 2.6.6
). I run a pyspark shell instance that automatically initializes HiveContext
as my sqlContext
.
I want to do a rolling sum with window
function. My problem is that the window frame is not fixed: it depends on the observation we consider. To be more specific, I order data by a variable called rank_id
and want to do rolling sum, for any observation indexed $x$ between indexes $x+1$ and $2x-1$. Thus, my rangeBetween
must depend on the rank_id
variable value.
An important point is that I don't want to collect data thus cannot use anything like numpy
(my data have many many observations).
Reproducible example
from pyspark.mllib.random import RandomRDDs
import pyspark.sql.functions as psf
from pyspark.sql.window import Window
# Reproducible example
data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
df = df.selectExpr("_1 as x", "_2 as y")
#df.show(2)
#+-------------------+------------------+
#| x| y|
#+-------------------+------------------+
#|0.32767742062486405|0.2506351566289311|
#| 0.7245348534550357| 0.597929853274274|
#+-------------------+------------------+
#only showing top 2 rows
# Finalize dataframe creation
w = Window().orderBy("x")
df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
#df.show(3)
#+--------------------+--------------------+-------+
#| x| y|rank_id|
#+--------------------+--------------------+-------+
#|0.016536160706045577|0.009892450530381458| 1|
#| 0.10943843181953838| 0.6478505849227775| 2|
#| 0.13916818312857027| 0.24165348228464578| 3|
#+--------------------+--------------------+-------+
#only showing top 3 rows
Fixed width cumulative sum: no problem
Using window
function, I am able to run a cumulative sum on a given number of indexes (I use here rangeBetween
but for this example rowBetween
could be used indifferently).
w = Window.orderBy('rank_id').rangeBetween(-1,3)
df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
#df1.show(3)
#+--------------------+--------------------+-------+------------------+
#| x| y|rank_id| roll1|
#+--------------------+--------------------+-------+------------------+
#|0.016536160706045577|0.009892450530381458| 1|0.9698521852602887|
#| 0.10943843181953838| 0.6478505849227775| 2|1.5744700156326066|
#| 0.13916818312857027| 0.24165348228464578| 3|2.3040547273760392|
#+--------------------+--------------------+-------+------------------+
#only showing top 3 rows
Cumulative sum width not fixed
I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy
maybe that's the problem), I got the following error
# Now if I want to make rangeBetween size depend on a variable
w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)
Traceback (most recent call last):
File "", line 1, in
TypeError: cannot concatenate 'str' and 'int' objects
I tried something else, using SQL statement
# Using SQL expression
df.registerTempTable('tempdf')
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
FROM tempdf;
""")
which this times gives me the following error
Traceback (most recent call last):
File "", line 6, in
File "/opt/application/Spark/current/python/pyspark/sql/context.py", line >580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot recognize input near 'rank_id' '+' '1' in windowframeboundary; line 3 pos 15"
I also noticed that when I try a more simple statement using SQL OVER
clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN -1 AND 1) AS cumsum
FROM tempdf;
""")
Traceback (most recent call last):
File "", line 6, in
File "/opt/application/Spark/current/python/pyspark/sql/context.py", line 580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot recognize input near '-' '1' 'AND' in windowframeboundary; line 3 pos 15"
How could I solve my problem by using either window
or SQL
statement within Spark?
python apache-spark pyspark window-functions
add a comment |
My Problem
I am currently facing difficulties with Spark window functions. I am using Spark (through pyspark) version 1.6.3
(associated Python version 2.6.6
). I run a pyspark shell instance that automatically initializes HiveContext
as my sqlContext
.
I want to do a rolling sum with window
function. My problem is that the window frame is not fixed: it depends on the observation we consider. To be more specific, I order data by a variable called rank_id
and want to do rolling sum, for any observation indexed $x$ between indexes $x+1$ and $2x-1$. Thus, my rangeBetween
must depend on the rank_id
variable value.
An important point is that I don't want to collect data thus cannot use anything like numpy
(my data have many many observations).
Reproducible example
from pyspark.mllib.random import RandomRDDs
import pyspark.sql.functions as psf
from pyspark.sql.window import Window
# Reproducible example
data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
df = df.selectExpr("_1 as x", "_2 as y")
#df.show(2)
#+-------------------+------------------+
#| x| y|
#+-------------------+------------------+
#|0.32767742062486405|0.2506351566289311|
#| 0.7245348534550357| 0.597929853274274|
#+-------------------+------------------+
#only showing top 2 rows
# Finalize dataframe creation
w = Window().orderBy("x")
df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
#df.show(3)
#+--------------------+--------------------+-------+
#| x| y|rank_id|
#+--------------------+--------------------+-------+
#|0.016536160706045577|0.009892450530381458| 1|
#| 0.10943843181953838| 0.6478505849227775| 2|
#| 0.13916818312857027| 0.24165348228464578| 3|
#+--------------------+--------------------+-------+
#only showing top 3 rows
Fixed width cumulative sum: no problem
Using window
function, I am able to run a cumulative sum on a given number of indexes (I use here rangeBetween
but for this example rowBetween
could be used indifferently).
w = Window.orderBy('rank_id').rangeBetween(-1,3)
df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
#df1.show(3)
#+--------------------+--------------------+-------+------------------+
#| x| y|rank_id| roll1|
#+--------------------+--------------------+-------+------------------+
#|0.016536160706045577|0.009892450530381458| 1|0.9698521852602887|
#| 0.10943843181953838| 0.6478505849227775| 2|1.5744700156326066|
#| 0.13916818312857027| 0.24165348228464578| 3|2.3040547273760392|
#+--------------------+--------------------+-------+------------------+
#only showing top 3 rows
Cumulative sum width not fixed
I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy
maybe that's the problem), I got the following error
# Now if I want to make rangeBetween size depend on a variable
w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)
Traceback (most recent call last):
File "", line 1, in
TypeError: cannot concatenate 'str' and 'int' objects
I tried something else, using SQL statement
# Using SQL expression
df.registerTempTable('tempdf')
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
FROM tempdf;
""")
which this times gives me the following error
Traceback (most recent call last):
File "", line 6, in
File "/opt/application/Spark/current/python/pyspark/sql/context.py", line >580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot recognize input near 'rank_id' '+' '1' in windowframeboundary; line 3 pos 15"
I also noticed that when I try a more simple statement using SQL OVER
clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN -1 AND 1) AS cumsum
FROM tempdf;
""")
Traceback (most recent call last):
File "", line 6, in
File "/opt/application/Spark/current/python/pyspark/sql/context.py", line 580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot recognize input near '-' '1' 'AND' in windowframeboundary; line 3 pos 15"
How could I solve my problem by using either window
or SQL
statement within Spark?
python apache-spark pyspark window-functions
My Problem
I am currently facing difficulties with Spark window functions. I am using Spark (through pyspark) version 1.6.3
(associated Python version 2.6.6
). I run a pyspark shell instance that automatically initializes HiveContext
as my sqlContext
.
I want to do a rolling sum with window
function. My problem is that the window frame is not fixed: it depends on the observation we consider. To be more specific, I order data by a variable called rank_id
and want to do rolling sum, for any observation indexed $x$ between indexes $x+1$ and $2x-1$. Thus, my rangeBetween
must depend on the rank_id
variable value.
An important point is that I don't want to collect data thus cannot use anything like numpy
(my data have many many observations).
Reproducible example
from pyspark.mllib.random import RandomRDDs
import pyspark.sql.functions as psf
from pyspark.sql.window import Window
# Reproducible example
data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
df = df.selectExpr("_1 as x", "_2 as y")
#df.show(2)
#+-------------------+------------------+
#| x| y|
#+-------------------+------------------+
#|0.32767742062486405|0.2506351566289311|
#| 0.7245348534550357| 0.597929853274274|
#+-------------------+------------------+
#only showing top 2 rows
# Finalize dataframe creation
w = Window().orderBy("x")
df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
#df.show(3)
#+--------------------+--------------------+-------+
#| x| y|rank_id|
#+--------------------+--------------------+-------+
#|0.016536160706045577|0.009892450530381458| 1|
#| 0.10943843181953838| 0.6478505849227775| 2|
#| 0.13916818312857027| 0.24165348228464578| 3|
#+--------------------+--------------------+-------+
#only showing top 3 rows
Fixed width cumulative sum: no problem
Using window
function, I am able to run a cumulative sum on a given number of indexes (I use here rangeBetween
but for this example rowBetween
could be used indifferently).
w = Window.orderBy('rank_id').rangeBetween(-1,3)
df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
#df1.show(3)
#+--------------------+--------------------+-------+------------------+
#| x| y|rank_id| roll1|
#+--------------------+--------------------+-------+------------------+
#|0.016536160706045577|0.009892450530381458| 1|0.9698521852602887|
#| 0.10943843181953838| 0.6478505849227775| 2|1.5744700156326066|
#| 0.13916818312857027| 0.24165348228464578| 3|2.3040547273760392|
#+--------------------+--------------------+-------+------------------+
#only showing top 3 rows
Cumulative sum width not fixed
I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy
maybe that's the problem), I got the following error
# Now if I want to make rangeBetween size depend on a variable
w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)
Traceback (most recent call last):
File "", line 1, in
TypeError: cannot concatenate 'str' and 'int' objects
I tried something else, using SQL statement
# Using SQL expression
df.registerTempTable('tempdf')
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
FROM tempdf;
""")
which this times gives me the following error
Traceback (most recent call last):
File "", line 6, in
File "/opt/application/Spark/current/python/pyspark/sql/context.py", line >580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot recognize input near 'rank_id' '+' '1' in windowframeboundary; line 3 pos 15"
I also noticed that when I try a more simple statement using SQL OVER
clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN -1 AND 1) AS cumsum
FROM tempdf;
""")
Traceback (most recent call last):
File "", line 6, in
File "/opt/application/Spark/current/python/pyspark/sql/context.py", line 580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot recognize input near '-' '1' 'AND' in windowframeboundary; line 3 pos 15"
How could I solve my problem by using either window
or SQL
statement within Spark?
python apache-spark pyspark window-functions
python apache-spark pyspark window-functions
edited Jan 10 '18 at 11:08
hi-zir
19.9k62864
19.9k62864
asked Jan 10 '18 at 10:41
linoglinog
82
82
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
How could I solve my problem by using either window or SQL statement within Spark?
TL;DR You cannot, or at least not in a scalable way, with current requirements. You can try something similar to sliding over RDD: How to transform data with sliding window over time series data in Pyspark
I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark
It is incorrect. Range specification requires (PRECEDING
| FOLLOWING
| CURRENT_ROW
) specification. Also there should be no semicolon:
SELECT *, SUM(x)
OVER (ORDER BY rank_id
RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
FROM tempdf
I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...
TypeError: cannot concatenate 'str' and 'int' objects
As exception says - you cannot call +
on string and integer. You probably wanted columns:
from pyspark.sql.functions import col
.rangeBetween(col('rank_id') + 1, 2* col('rank_id') - 1)
but this is not supported. Range has to be of fixed size and cannot be defined in terms of expressions.
An important point is that I don't want to collect data
Window definition without partitionBy
:
w = Window.orderBy('rank_id').rangeBetween(-1,3)
is as bad as collect. So even if there are workarounds for "dynamic frame" (with conditionals and unbounded window) problem, they won't help you here.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f48185535%2fapply-window-function-in-spark-with-non-constant-frame-size%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
How could I solve my problem by using either window or SQL statement within Spark?
TL;DR You cannot, or at least not in a scalable way, with current requirements. You can try something similar to sliding over RDD: How to transform data with sliding window over time series data in Pyspark
I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark
It is incorrect. Range specification requires (PRECEDING
| FOLLOWING
| CURRENT_ROW
) specification. Also there should be no semicolon:
SELECT *, SUM(x)
OVER (ORDER BY rank_id
RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
FROM tempdf
I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...
TypeError: cannot concatenate 'str' and 'int' objects
As exception says - you cannot call +
on string and integer. You probably wanted columns:
from pyspark.sql.functions import col
.rangeBetween(col('rank_id') + 1, 2* col('rank_id') - 1)
but this is not supported. Range has to be of fixed size and cannot be defined in terms of expressions.
An important point is that I don't want to collect data
Window definition without partitionBy
:
w = Window.orderBy('rank_id').rangeBetween(-1,3)
is as bad as collect. So even if there are workarounds for "dynamic frame" (with conditionals and unbounded window) problem, they won't help you here.
add a comment |
How could I solve my problem by using either window or SQL statement within Spark?
TL;DR You cannot, or at least not in a scalable way, with current requirements. You can try something similar to sliding over RDD: How to transform data with sliding window over time series data in Pyspark
I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark
It is incorrect. Range specification requires (PRECEDING
| FOLLOWING
| CURRENT_ROW
) specification. Also there should be no semicolon:
SELECT *, SUM(x)
OVER (ORDER BY rank_id
RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
FROM tempdf
I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...
TypeError: cannot concatenate 'str' and 'int' objects
As exception says - you cannot call +
on string and integer. You probably wanted columns:
from pyspark.sql.functions import col
.rangeBetween(col('rank_id') + 1, 2* col('rank_id') - 1)
but this is not supported. Range has to be of fixed size and cannot be defined in terms of expressions.
An important point is that I don't want to collect data
Window definition without partitionBy
:
w = Window.orderBy('rank_id').rangeBetween(-1,3)
is as bad as collect. So even if there are workarounds for "dynamic frame" (with conditionals and unbounded window) problem, they won't help you here.
add a comment |
How could I solve my problem by using either window or SQL statement within Spark?
TL;DR You cannot, or at least not in a scalable way, with current requirements. You can try something similar to sliding over RDD: How to transform data with sliding window over time series data in Pyspark
I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark
It is incorrect. Range specification requires (PRECEDING
| FOLLOWING
| CURRENT_ROW
) specification. Also there should be no semicolon:
SELECT *, SUM(x)
OVER (ORDER BY rank_id
RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
FROM tempdf
I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...
TypeError: cannot concatenate 'str' and 'int' objects
As exception says - you cannot call +
on string and integer. You probably wanted columns:
from pyspark.sql.functions import col
.rangeBetween(col('rank_id') + 1, 2* col('rank_id') - 1)
but this is not supported. Range has to be of fixed size and cannot be defined in terms of expressions.
An important point is that I don't want to collect data
Window definition without partitionBy
:
w = Window.orderBy('rank_id').rangeBetween(-1,3)
is as bad as collect. So even if there are workarounds for "dynamic frame" (with conditionals and unbounded window) problem, they won't help you here.
How could I solve my problem by using either window or SQL statement within Spark?
TL;DR You cannot, or at least not in a scalable way, with current requirements. You can try something similar to sliding over RDD: How to transform data with sliding window over time series data in Pyspark
I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark
It is incorrect. Range specification requires (PRECEDING
| FOLLOWING
| CURRENT_ROW
) specification. Also there should be no semicolon:
SELECT *, SUM(x)
OVER (ORDER BY rank_id
RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
FROM tempdf
I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...
TypeError: cannot concatenate 'str' and 'int' objects
As exception says - you cannot call +
on string and integer. You probably wanted columns:
from pyspark.sql.functions import col
.rangeBetween(col('rank_id') + 1, 2* col('rank_id') - 1)
but this is not supported. Range has to be of fixed size and cannot be defined in terms of expressions.
An important point is that I don't want to collect data
Window definition without partitionBy
:
w = Window.orderBy('rank_id').rangeBetween(-1,3)
is as bad as collect. So even if there are workarounds for "dynamic frame" (with conditionals and unbounded window) problem, they won't help you here.
edited Jan 10 '18 at 11:08
answered Jan 10 '18 at 11:03
hi-zirhi-zir
19.9k62864
19.9k62864
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f48185535%2fapply-window-function-in-spark-with-non-constant-frame-size%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown