-
Notifications
You must be signed in to change notification settings - Fork 2k
Description
Feature request
Which Delta project/connector is this regarding?
- Spark
- Standalone
- Flink
- Kernel
- Other (fill in here)
Overview
Spark Connect is a new initiative in Apache Spark that adds a decoupled client-server infrastructure which allows Spark applications to connect remotely to a Spark server and run SQL / Dataframe operations. We want to develop what we're calling "Delta Connect" to allow Delta operations to be made in applications running in such client-server mode.
Further details
These are the CUJs we would like to support:
Server
The server is packaged into the io.delta:delta-spark-connect-server_2.13 package, installing this package automatically installs the io.delta:delta-spark-connect-common_2.13 package.
sbin/start-connect-server.sh \
--packages org.apache.spark:spark-connect_2.13:4.0.0, io.delta:delta-spark-connect-server_2.13:4.0.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Scala Client
The client is packaged into the io.delta:delta-spark-connect-client_2.13 package, installing this package automatically installs the io.delta:delta-spark-connect-common_2.13 package.
export SPARK_REMOTE="sc://localhost:15002"
spark-connect-repl --packages io.delta:delta-spark-connect-client_2.13:4.0.0
The delta-spark-connect-client_2.13 package uses the exact same class and package names as the delta-spark_2.13 package. Therefore the exact same code can be used as before.
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "my_table")
deltaTable.update(
condition = expr("id % 2 == 0"),
set = Map("id" -> expr("id + 100")))
Python Client
The Delta Connect Python client is included in the same PyPi package as Delta Spark.
pip install pyspark==4.0.0
pip install delta-spark==4.0.0
There is no difference in usage compared to the classic way. We just need to pass in a remote SparkSession (instead of a local one) to the DeltaTable API.
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()
deltaTable = DeltaTable.forName(spark, "my_table")
deltaTable.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })