Skip to content

Fillnull command introduced #723

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ Assumptions: `a`, `b`, `c` are existing fields in `table`
- `source = table | eval f = case(a = 0, 'zero', a = 1, 'one' else 'unknown')`
- `source = table | eval f = case(a = 0, 'zero', a = 1, 'one' else concat(a, ' is an incorrect binary digit'))`

#### Fillnull
Assumptions: `a`, `b`, `c`, `d`, `e` are existing fields in `table`
- `source = table | fillnull with 0 in a`
- `source = table | fillnull with 'N/A' in a, b, c`
- `source = table | fillnull with concat(a, b) in c, d`
- `source = table | fillnull using a = 101`
- `source = table | fillnull using a = 101, b = 102`
- `source = table | fillnull using a = concat(b, c), d = 2 * pi() * e`

```sql
source = table | eval e = eval status_category =
Expand Down
2 changes: 2 additions & 0 deletions docs/ppl-lang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ For additional examples see the next [documentation](PPL-Example-Commands.md).
- [`dedup command `](ppl-dedup-command.md)

- [`describe command`](PPL-Example-Commands.md/#describe)

- [`fillnull command`](ppl-fillnull-command.md)

- [`eval command`](ppl-eval-command.md)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,4 +619,27 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| (6, 403, '/home', '2023-10-01 10:25:00')
| """.stripMargin)
}

protected def createNullableTableHttpLog(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
|(
| id INT,
| status_code INT,
| request_path STRING,
| timestamp STRING
|)
| USING $tableType $tableOptions
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| VALUES (1, 200, '/home', null),
| (2, null, '/about', '2023-10-01 10:05:00'),
| (3, null, '/contact', '2023-10-01 10:10:00'),
| (4, 301, null, '2023-10-01 10:15:00'),
| (5, 200, null, '2023-10-01 10:20:00'),
| (6, 403, '/home', null)
| """.stripMargin)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.flint.spark.ppl

import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq

import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Expression, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, LogicalPlan, Project, Sort}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLFillnullITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createNullableTableHttpLog(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test fillnull with one null replacement value and one column") {
val frame = sql(s"""
| source = $testTable | fillnull with 0 in status_code
| """.stripMargin)

assert(frame.columns.sameElements(Array("id", "request_path", "timestamp", "status_code")))
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(1, "/home", null, 200),
Row(2, "/about", "2023-10-01 10:05:00", 0),
Row(3, "/contact", "2023-10-01 10:10:00", 0),
Row(4, null, "2023-10-01 10:15:00", 301),
Row(5, null, "2023-10-01 10:20:00", 200),
Row(6, "/home", null, 403))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val expectedPlan = fillNullExpectedPlan(Seq(("status_code", Literal(0))))
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test fillnull with various null replacement values and one column") {
val frame = sql(s"""
| source = $testTable | fillnull using status_code=101
| """.stripMargin)

assert(frame.columns.sameElements(Array("id", "request_path", "timestamp", "status_code")))
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(1, "/home", null, 200),
Row(2, "/about", "2023-10-01 10:05:00", 101),
Row(3, "/contact", "2023-10-01 10:10:00", 101),
Row(4, null, "2023-10-01 10:15:00", 301),
Row(5, null, "2023-10-01 10:20:00", 200),
Row(6, "/home", null, 403))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val expectedPlan = fillNullExpectedPlan(Seq(("status_code", Literal(101))))
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test fillnull with one null replacement value and two columns") {
val frame = sql(s"""
| source = $testTable | fillnull with concat('??', '?') in request_path, timestamp | fields id, request_path, timestamp
| """.stripMargin)

assert(frame.columns.sameElements(Array("id", "request_path", "timestamp")))
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(1, "/home", "???"),
Row(2, "/about", "2023-10-01 10:05:00"),
Row(3, "/contact", "2023-10-01 10:10:00"),
Row(4, "???", "2023-10-01 10:15:00"),
Row(5, "???", "2023-10-01 10:20:00"),
Row(6, "/home", "???"))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val fillNullPlan = fillNullExpectedPlan(
Seq(
(
"request_path",
UnresolvedFunction("concat", Seq(Literal("??"), Literal("?")), isDistinct = false)),
(
"timestamp",
UnresolvedFunction("concat", Seq(Literal("??"), Literal("?")), isDistinct = false))),
addDefaultProject = false)
val expectedPlan = Project(
Seq(
UnresolvedAttribute("id"),
UnresolvedAttribute("request_path"),
UnresolvedAttribute("timestamp")),
fillNullPlan)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test fillnull with various null replacement values and two columns") {
val frame = sql(s"""
| source = $testTable | fillnull using request_path=upper('/not_found'), timestamp='*' | fields id, request_path, timestamp
| """.stripMargin)

assert(frame.columns.sameElements(Array("id", "request_path", "timestamp")))
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(1, "/home", "*"),
Row(2, "/about", "2023-10-01 10:05:00"),
Row(3, "/contact", "2023-10-01 10:10:00"),
Row(4, "/NOT_FOUND", "2023-10-01 10:15:00"),
Row(5, "/NOT_FOUND", "2023-10-01 10:20:00"),
Row(6, "/home", "*"))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val fillNullPlan = fillNullExpectedPlan(
Seq(
(
"request_path",
UnresolvedFunction("upper", Seq(Literal("/not_found")), isDistinct = false)),
("timestamp", Literal("*"))),
addDefaultProject = false)
val expectedPlan = Project(
Seq(
UnresolvedAttribute("id"),
UnresolvedAttribute("request_path"),
UnresolvedAttribute("timestamp")),
fillNullPlan)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test fillnull with one null replacement value and stats and sort command") {
val frame = sql(s"""
| source = $testTable | fillnull with 500 in status_code
| | stats count(status_code) by status_code, request_path
| | sort request_path, status_code
| """.stripMargin)

assert(frame.columns.sameElements(Array("count(status_code)", "status_code", "request_path")))
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(1, 200, null),
Row(1, 301, null),
Row(1, 500, "/about"),
Row(1, 500, "/contact"),
Row(1, 200, "/home"),
Row(1, 403, "/home"))
// Compare the results
assert(results.sameElements(expectedResults))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val fillNullPlan =
fillNullExpectedPlan(Seq(("status_code", Literal(500))), addDefaultProject = false)
val aggregateExpressions =
Seq(
Alias(
UnresolvedFunction(
Seq("COUNT"),
Seq(UnresolvedAttribute("status_code")),
isDistinct = false),
"count(status_code)")(),
Alias(UnresolvedAttribute("status_code"), "status_code")(),
Alias(UnresolvedAttribute("request_path"), "request_path")())
val aggregatePlan = Aggregate(
Seq(
Alias(UnresolvedAttribute("status_code"), "status_code")(),
Alias(UnresolvedAttribute("request_path"), "request_path")()),
aggregateExpressions,
fillNullPlan)
val sortPlan = Sort(
Seq(
SortOrder(UnresolvedAttribute("request_path"), Ascending),
SortOrder(UnresolvedAttribute("status_code"), Ascending)),
global = true,
aggregatePlan)
val expectedPlan = Project(seq(UnresolvedStar(None)), sortPlan)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test fillnull with various null replacement value and stats and sort command") {
val frame = sql(s"""
| source = $testTable | fillnull using status_code = 500, request_path = '/home'
| | stats count(status_code) by status_code, request_path
| | sort request_path, status_code
| """.stripMargin)

assert(frame.columns.sameElements(Array("count(status_code)", "status_code", "request_path")))
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(1, 500, "/about"),
Row(1, 500, "/contact"),
Row(2, 200, "/home"),
Row(1, 301, "/home"),
Row(1, 403, "/home"))
// Compare the results
assert(results.sameElements(expectedResults))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val fillNullPlan = fillNullExpectedPlan(
Seq(("status_code", Literal(500)), ("request_path", Literal("/home"))),
addDefaultProject = false)
val aggregateExpressions =
Seq(
Alias(
UnresolvedFunction(
Seq("COUNT"),
Seq(UnresolvedAttribute("status_code")),
isDistinct = false),
"count(status_code)")(),
Alias(UnresolvedAttribute("status_code"), "status_code")(),
Alias(UnresolvedAttribute("request_path"), "request_path")())
val aggregatePlan = Aggregate(
Seq(
Alias(UnresolvedAttribute("status_code"), "status_code")(),
Alias(UnresolvedAttribute("request_path"), "request_path")()),
aggregateExpressions,
fillNullPlan)
val sortPlan = Sort(
Seq(
SortOrder(UnresolvedAttribute("request_path"), Ascending),
SortOrder(UnresolvedAttribute("status_code"), Ascending)),
global = true,
aggregatePlan)
val expectedPlan = Project(seq(UnresolvedStar(None)), sortPlan)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test fillnull with one null replacement value and missing columns") {
val ex = intercept[AnalysisException](sql(s"""
| source = $testTable | fillnull with '!!!' in
| """.stripMargin))

assert(ex.getMessage().contains("Syntax error "))
}

test("test fillnull with various null replacement values and missing columns") {
val ex = intercept[AnalysisException](sql(s"""
| source = $testTable | fillnull using
| """.stripMargin))

assert(ex.getMessage().contains("Syntax error "))
}

private def fillNullExpectedPlan(
nullReplacements: Seq[(String, Expression)],
addDefaultProject: Boolean = true): LogicalPlan = {
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val renameProjectList = UnresolvedStar(None) +: nullReplacements.map {
case (nullableColumn, nullReplacement) =>
Alias(
UnresolvedFunction(
"coalesce",
Seq(UnresolvedAttribute(nullableColumn), nullReplacement),
isDistinct = false),
nullableColumn)()
}
val renameProject = Project(renameProjectList, table)
val droppedColumns =
nullReplacements.map(_._1).map(columnName => UnresolvedAttribute(columnName))
val dropSourceColumn = DataFrameDropColumns(droppedColumns, renameProject)
if (addDefaultProject) {
Project(seq(UnresolvedStar(None)), dropSourceColumn)
} else {
dropSourceColumn
}
}
}
4 changes: 4 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ NEW_FIELD: 'NEW_FIELD';
KMEANS: 'KMEANS';
AD: 'AD';
ML: 'ML';
FILLNULL: 'FILLNULL';

//Native JOIN KEYWORDS
JOIN: 'JOIN';
Expand Down Expand Up @@ -72,6 +73,9 @@ INDEX: 'INDEX';
D: 'D';
DESC: 'DESC';
DATASOURCES: 'DATASOURCES';
VALUE: 'VALUE';
USING: 'USING';
WITH: 'WITH';

// CLAUSE KEYWORDS
SORTBY: 'SORTBY';
Expand Down
Loading
Loading