Skip to content

[BUG] MergeIntoCommand not visible in QueryExecutionListener when using Python/Scala API #1521

Closed
@sh0ck-wave

Description

@sh0ck-wave

Bug

MergeIntoCommand not visible in QueryExecutionListener when using Python/Scala API to execute a merge operation

Describe the problem

When using sql MERGE statement via spark.sql a LogicalPlan of type org.apache.spark.sql.delta.commands.MergeIntoCommand is visible to any registered spark QueryExecutionListener, this is useful for capturing statistics and data lineage.
When using the python API to execute the merge operation, no such LogicalPlan is visible to registered spark QueryExecutionListeners making it difficult to track merge related statistics and data lineage

Steps to reproduce

Execute the following scala spark application:

package com.foo.bar

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.sql.execution.QueryExecution
import io.delta.tables.DeltaTable

class MyListener extends QueryExecutionListener {

  override def onSuccess(
      funcName: String,
      qe: QueryExecution,
      durationNs: Long
  ): Unit = {
    println(s"Received Event from ${qe.analyzed.getClass}")
  }

  override def onFailure(
      funcName: String,
      qe: QueryExecution,
      exception: Exception
  ): Unit = {
    println(s"Received Failure Event from ${qe.analyzed.getClass}")
  }
}

case class Player(id: Integer, name: String)

object SimpleApp {
  def main(args: Array[String]) = {
    println(s"starting")

    val spark_conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("SimpleApp")
      .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
      .set(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog"
      )
    val spark = SparkSession.builder.config(spark_conf).getOrCreate()
    spark.sparkContext.setLogLevel("OFF")

    val df = spark.createDataFrame[Player](Seq(Player(1, "Quark")))
    val df1 = spark.createDataFrame[Player](Seq(Player(1, "Boson")))
    (
      df.write
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .option("path", "/path/to/table1")
        .format("delta")
        .saveAsTable("base")
    )

    (
      df1.write
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .option("path", "/path/to/table2")
        .format("delta")
        .saveAsTable("update")
    )

    spark.listenerManager.register(new MyListener())

    println("Captured plans for SQL MERGE:")

    spark.sql(
      """
      |MERGE INTO base
      |USING update
      |ON base.Id = update.Id
      |WHEN MATCHED THEN
      |    UPDATE SET *
      |WHEN NOT MATCHED THEN
      |    INSERT *
      """.stripMargin
    )
    Thread.sleep(5000)
    spark.listenerManager.clear()

    val base_table = DeltaTable.forPath(spark, "/path/to/table1")
    val update_table = DeltaTable.forPath(spark, "/path/to/table2")
    val merge = (
        base_table.alias("a")
        .merge(update_table.toDF.alias("b"), "a.Id == b.Id")
        .whenMatched().updateAll()
        .whenNotMatched().insertAll()
    )
    
    spark.listenerManager.register(new MyListener())
    println("")
    println("Captured plans for Delta API:")
    merge.execute()


    Thread.sleep(5000)
    spark.stop()
  }
}

Observed results

Captured plans for SQL MERGE:
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Filter
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Aggregate
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Filter
Received Event from class org.apache.spark.sql.catalyst.plans.logical.SerializeFromObject
Received Event from class org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
Received Event from class org.apache.spark.sql.delta.commands.MergeIntoCommand

Captured plans for Delta API:
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Filter
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Aggregate
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Filter
Received Event from class org.apache.spark.sql.catalyst.plans.logical.SerializeFromObject
Received Event from class org.apache.spark.sql.catalyst.plans.logical.GlobalLimit

As can be seen in the case of Delta API there is no org.apache.spark.sql.delta.commands.MergeIntoCommand captured by the QueryExecutionListener

Expected results

org.apache.spark.sql.delta.commands.MergeIntoCommand should be captured by QueryExecutionListener for Delta API similar to SQL MERGE command

Environment information

  • Delta Lake version: 2.1.0
  • Spark version: 3.3.1
  • Scala version: 2.12.15

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions