Skip to content

[GH-1918] Spark 4 support #1919

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
53c2dd6
Add Spark 4 support
Kimahriman Dec 11, 2024
5899a21
Use staging repo
Kimahriman Feb 28, 2025
3a86979
Try to run tests
Kimahriman Feb 28, 2025
a803f22
Work on Spark 4 support
Kimahriman Mar 7, 2025
baf6671
Use shims instead of reflection
Kimahriman Mar 10, 2025
772fd07
Bump hadoop version
Kimahriman Mar 10, 2025
3392a9e
Add 4.0 copy, add nulll intolerant shim, and ignore dbscan tests
Kimahriman Apr 4, 2025
1887a1d
Fully shade jiffle and antlr
Kimahriman Apr 8, 2025
500872f
Add back specific exclusions to jiffle
Kimahriman Apr 9, 2025
6750e7d
Fix test
Kimahriman Apr 9, 2025
c2942ce
Bump to RC4
Kimahriman Apr 11, 2025
de401ab
Undo workflow change and add comment to shading
Kimahriman Apr 13, 2025
5531f2b
Run pre-commit
Kimahriman Apr 13, 2025
e784494
Comment in pom
Kimahriman Apr 13, 2025
ad0411d
Merge branch 'master' into spark-4-support
Kimahriman Apr 13, 2025
81edc3c
Update snapshot version
Kimahriman Apr 13, 2025
ba1c79a
Fix strategy import
Kimahriman Apr 13, 2025
531d4dd
Re-sync new spark-4.0 module and update arrow eval for 4.0 changes
Kimahriman Apr 14, 2025
ad28e5d
Rework off of current arrow eval python
Kimahriman Apr 14, 2025
fa9d218
Install pyspark for udf test
Kimahriman Apr 15, 2025
2a9fc55
Generate sources before scala doc
Kimahriman Apr 16, 2025
04c93be
Merge branch 'master' into spark-4-support
Kimahriman May 21, 2025
ffebe86
Use official 4.0 release
Kimahriman May 23, 2025
8c606df
Use officialy python release and add to python test matrix
Kimahriman May 23, 2025
1fdfbef
Set java version for python tests
Kimahriman May 23, 2025
8befa97
Bump pandas to 2.x
Kimahriman May 23, 2025
4771e52
Bump min python to 3.8
Kimahriman May 23, 2025
daac5f1
Try to fix Python tests
Kimahriman May 23, 2025
1d15cfd
Remove jiffle from spark common since it's not used and causes spark-…
Kimahriman May 23, 2025
d9326b0
Exclude jiffle in spark-shaded
Kimahriman May 23, 2025
acfc191
Don't include connect for Spark 4+
Kimahriman May 24, 2025
770ef84
Merge branch 'master' into spark-4-support
Kimahriman Jun 3, 2025
7bf2a25
Merge branch 'master' into spark-4-support
Kimahriman Jun 9, 2025
b2362a0
pre-check
Kimahriman Jun 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
- name: Compile JavaDoc
run: mvn -q clean install -DskipTests && mkdir -p docs/api/javadoc/spark && cp -r spark/common/target/apidocs/* docs/api/javadoc/spark/
- name: Compile ScalaDoc
run: mvn scala:doc -pl !common,!snowflake,!flink && mkdir -p docs/api/scaladoc/spark && cp -r spark/common/target/site/scaladocs/* docs/api/scaladoc/spark
run: mvn generate-sources scala:doc -pl !common,!snowflake,!flink && mkdir -p docs/api/scaladoc/spark && cp -r spark/common/target/site/scaladocs/* docs/api/scaladoc/spark
- uses: actions/setup-python@v5
with:
python-version: 3.x
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ jobs:
fail-fast: true
matrix:
include:
- spark: 4.0.0
scala: 2.13.8
jdk: '17'
- spark: 3.5.0
scala: 2.13.8
jdk: '11'
Expand Down Expand Up @@ -105,6 +108,11 @@ jobs:
export SPARK_HOME=$(python -c "import pyspark; print(pyspark.__path__[0])")
fi

if [ "${SPARK_VERSION}" == "4.0.0" ]; then
pip install pyspark==4.0.0 pandas shapely apache-sedona pyarrow
export SPARK_HOME=$(python -c "import pyspark; print(pyspark.__path__[0])")
fi

mvn -q clean install -Dspark=${SPARK_COMPAT_VERSION} -Dscala=${SCALA_VERSION:0:4} -Dspark.version=${SPARK_VERSION} ${SKIP_TESTS}
- run: mkdir staging
- run: cp spark-shaded/target/sedona-*.jar staging
Expand Down
20 changes: 19 additions & 1 deletion .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,33 +60,46 @@ jobs:
strategy:
matrix:
include:
- spark: '4.0.0'
scala: '2.13.8'
java: '17'
python: '3.10'
- spark: '3.5.0'
scala: '2.12.8'
java: '11'
python: '3.10'
shapely: '1'
- spark: '3.5.0'
scala: '2.12.8'
java: '11'
python: '3.10'
- spark: '3.5.0'
scala: '2.12.8'
java: '11'
python: '3.9'
- spark: '3.5.0'
scala: '2.12.8'
java: '11'
python: '3.8'
- spark: '3.4.0'
scala: '2.12.8'
java: '11'
python: '3.10'
- spark: '3.4.0'
scala: '2.12.8'
java: '11'
python: '3.9'
- spark: '3.4.0'
scala: '2.12.8'
java: '11'
python: '3.8'
- spark: '3.4.0'
scala: '2.12.8'
java: '11'
python: '3.8'
- spark: '3.4.0'
scala: '2.12.8'
java: '11'
python: '3.8'
shapely: '1'

Expand All @@ -97,7 +110,7 @@ jobs:
- uses: actions/setup-java@v4
with:
distribution: 'zulu'
java-version: '11'
java-version: '${{ matrix.java }}'
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python }}
Expand All @@ -123,12 +136,17 @@ jobs:
SPARK_VERSION: ${{ matrix.spark }}
PYTHON_VERSION: ${{ matrix.python }}
SHAPELY_VERSION: ${{ matrix.shapely }}
PANDAS_VERSION: ${{ matrix.pandas }}
run: |
cd python
if [ "${SHAPELY_VERSION}" == "1" ]; then
echo "Patching Pipfile to use Shapely 1.x"
sed -i 's/^shapely.*$/shapely="<2.0.0"/g' Pipfile
fi
if [ "${PANDAS_VERSION}" == "1" ]; then
echo "Patching Pipfile to use Pandas 1.x"
sed -i 's/^pandas.*$/pandas="<2.0.0"/g' Pipfile
fi
export PIPENV_CUSTOM_VENV_NAME=python-${PYTHON_VERSION}
pipenv --python ${PYTHON_VERSION}
pipenv install pyspark==${SPARK_VERSION}
Expand Down
24 changes: 23 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@
<spatial4j.version>0.8</spatial4j.version>

<jt-jiffle.version>1.1.24</jt-jiffle.version>
<antlr-runtime.version>4.9.3</antlr-runtime.version>
<janino-version>3.1.9</janino-version>

<!-- Actual scala, spark and log4j version will be set by activated profiles.
Setting a default value helps IDE:s that can't make sense of profiles. -->
<scala.compat.version>2.12</scala.compat.version>
<spark.version>3.4.0</spark.version>
<spark.compat.version>3.4</spark.compat.version>
<spark.major.version>3</spark.major.version>
<log4j.version>2.19.0</log4j.version>
<graphframe.version>0.8.3-spark3.4</graphframe.version>

Expand Down Expand Up @@ -736,6 +736,28 @@
<skip.deploy.common.modules>true</skip.deploy.common.modules>
</properties>
</profile>
<profile>
<id>sedona-spark-4.0</id>
<activation>
<property>
<name>spark</name>
<value>4.0</value>
</property>
</activation>
<properties>
<spark.version>4.0.0</spark.version>
<spark.compat.version>4.0</spark.compat.version>
<spark.major.version>4</spark.major.version>
<hadoop.version>3.4.1</hadoop.version>
<log4j.version>2.24.3</log4j.version>
<slf4j.version>2.0.16</slf4j.version>
<graphframe.version>0.8.3-spark3.5</graphframe.version>
<scala.version>2.13.12</scala.version>
<scala.compat.version>2.13</scala.compat.version>
<!-- Skip deploying parent module. it will be deployed with sedona-spark-3.3 -->
<skip.deploy.common.modules>true</skip.deploy.common.modules>
</properties>
</profile>
<profile>
<id>scala2.13</id>
<activation>
Expand Down
2 changes: 1 addition & 1 deletion python/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ matplotlib = "*" # implicit dependency of esda
scipy = "<=1.10.0" # prevent incompatibility with pysal 4.7.0, which is what is resolved to when shapely >2 is specified

[packages]
pandas="<=1.5.3"
pandas=">=2.0.0"
numpy="<2"
geopandas="*"
# https://stackoverflow.com/questions/78949093/how-to-resolve-attributeerror-module-fiona-has-no-attribute-path
Expand Down
3 changes: 1 addition & 2 deletions python/sedona/spark/core/jvm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from re import findall
from typing import Optional, Tuple

from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession

from sedona.spark.utils.decorators import classproperty
Expand Down Expand Up @@ -189,7 +188,7 @@ def get_spark_java_config(

try:
used_jar_files = java_spark_conf.get(value)
except Py4JJavaError:
except Exception:
error_message = f"Didn't find the value of {value} from SparkConf"
logging.info(error_message)

Expand Down
4 changes: 4 additions & 0 deletions python/tests/sql/test_dataframe_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import concurrent.futures
from typing import Callable, Tuple

import pyspark
import pytest
from pyspark.sql import Row
from pyspark.sql import functions as f
Expand Down Expand Up @@ -1760,6 +1761,9 @@ def run_spatial_query():
os.getenv("SPARK_REMOTE") is not None,
reason="Checkpoint dir is not available in Spark Connect",
)
@pytest.mark.skipif(
pyspark.__version__ >= "4", reason="DBSCAN is not supported yet on Spark 4"
)
def test_dbscan(self):
df = self.spark.createDataFrame([{"id": 1, "x": 2, "y": 3}]).withColumn(
"geometry", f.expr("ST_Point(x, y)")
Expand Down
4 changes: 4 additions & 0 deletions python/tests/stats/test_dbscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import pyspark
import pyspark.sql.functions as f
import pytest
from sklearn.cluster import DBSCAN as sklearnDBSCAN
Expand All @@ -25,6 +26,9 @@
from sedona.spark.stats import dbscan


@pytest.mark.skipif(
pyspark.__version__ >= "4", reason="DBSCAN is not supported yet on Spark 4"
)
class TestDBScan(TestBase):

@pytest.fixture
Expand Down
29 changes: 16 additions & 13 deletions python/tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,29 @@ def spark(self):

builder = SedonaContext.builder().appName("SedonaSparkTest")
if SPARK_REMOTE:
builder = (
builder.remote(SPARK_REMOTE)
.config(
builder = builder.remote(SPARK_REMOTE).config(
"spark.sql.extensions",
"org.apache.sedona.sql.SedonaSqlExtensions",
)

# Connect is packaged with Spark 4+
if pyspark.__version__ < "4":
builder = builder.config(
"spark.jars.packages",
f"org.apache.spark:spark-connect_2.12:{pyspark.__version__}",
)
.config(
"spark.sql.extensions",
"org.apache.sedona.sql.SedonaSqlExtensions",
)
.config(
"spark.sedona.stac.load.itemsLimitMax",
"20",
)
)
else:
builder = builder.master("local[*]").config(
builder = builder.master("local[*]")

builder = (
builder.config(
"spark.sedona.stac.load.itemsLimitMax",
"20",
)
# Pandas on PySpark doesn't work with ANSI mode, which is enabled by default
# in Spark 4
.config("spark.sql.ansi.enabled", "false")
)

# Allows the Sedona .jar to be explicitly set by the caller (e.g, to run
# pytest against a freshly-built development version of Sedona)
Expand Down
38 changes: 38 additions & 0 deletions spark/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,25 @@
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala-spark-${spark.major.version}</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
Expand All @@ -290,4 +309,23 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>sedona-spark-4.0</id>
<activation>
<property>
<name>spark</name>
<value>4.0</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-api_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,59 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.sedona_sql.expressions
package org.apache.spark.sql.sedona_sql

import scala.reflect.ClassTag

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.Column
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.execution.aggregate.ScalaUDAF
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.StructType

trait DataFrameAPI {
protected def wrapExpression[E <: Expression: ClassTag](args: Any*): Column = {
val exprArgs = args.map(_ match {
case c: Column => c.expr
case s: String => Column(s).expr
case e: Expression => e
case x: Any => Literal(x)
case null => Literal(null)
})
val expressionConstructor =
implicitly[ClassTag[E]].runtimeClass.getConstructor(classOf[Seq[Expression]])
val expressionInstance = expressionConstructor.newInstance(exprArgs).asInstanceOf[E]
Column(expressionInstance)
object DataFrameShims {

private[sedona_sql] def wrapExpression[E <: Expression: ClassTag](args: Any*): Column = {
wrapVarArgExpression[E](args)
}

protected def wrapVarArgExpression[E <: Expression: ClassTag](arg: Seq[Any]): Column = {
private[sedona_sql] def wrapVarArgExpression[E <: Expression: ClassTag](arg: Seq[Any]): Column = {
val runtimeClass = implicitly[ClassTag[E]].runtimeClass
val exprArgs = arg.map(_ match {
case c: Column => c.expr
case s: String => Column(s).expr
case e: Expression => e
case x: Any => Literal(x)
case null => Literal(null)
})
val expressionConstructor =
implicitly[ClassTag[E]].runtimeClass.getConstructor(classOf[Seq[Expression]])
val expressionConstructor = runtimeClass.getConstructor(classOf[Seq[Expression]])
val expressionInstance = expressionConstructor.newInstance(exprArgs).asInstanceOf[E]
Column(expressionInstance)
}

protected def wrapAggregator[A <: UserDefinedAggregateFunction: ClassTag](arg: Any*): Column = {
private[sedona_sql] def wrapAggregator[A <: UserDefinedAggregateFunction: ClassTag](arg: Any*): Column = {
val runtimeClass = implicitly[ClassTag[A]].runtimeClass
val exprArgs = arg.map(_ match {
case c: Column => c.expr
case s: String => Column(s).expr
case e: Expression => e
case x: Any => Literal(x)
case null => Literal(null)
})
val aggregatorClass = implicitly[ClassTag[A]].runtimeClass
val aggregatorConstructor = aggregatorClass.getConstructor()
val aggregatorConstructor = runtimeClass.getConstructor()
val aggregatorInstance =
aggregatorConstructor.newInstance().asInstanceOf[UserDefinedAggregateFunction]
val scalaAggregator = ScalaUDAF(exprArgs, aggregatorInstance)
Column(scalaAggregator)
}

private[sedona_sql] def createDataFrame(
sparkSession: SparkSession,
rdd: RDD[InternalRow],
schema: StructType): DataFrame = {
sparkSession.internalCreateDataFrame(rdd, schema)
}
}
Loading
Loading