Skip to content

Commit c611f1f

Browse files
Initial work on integration tests and documentation for the fillnull command
Signed-off-by: Lukasz Soszynski <[email protected]>
1 parent 8a14ecd commit c611f1f

File tree

5 files changed

+311
-0
lines changed

5 files changed

+311
-0
lines changed

integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,4 +619,27 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
619619
| (6, 403, '/home', '2023-10-01 10:25:00')
620620
| """.stripMargin)
621621
}
622+
623+
protected def createNullableTableHttpLog(testTable: String): Unit = {
624+
sql(s"""
625+
| CREATE TABLE $testTable
626+
|(
627+
| id INT,
628+
| status_code INT,
629+
| request_path STRING,
630+
| timestamp STRING
631+
|)
632+
| USING $tableType $tableOptions
633+
|""".stripMargin)
634+
635+
sql(s"""
636+
| INSERT INTO $testTable
637+
| VALUES (1, 200, '/home', null),
638+
| (2, null, '/about', '2023-10-01 10:05:00'),
639+
| (3, null, '/contact', '2023-10-01 10:10:00'),
640+
| (4, 301, null, '2023-10-01 10:15:00'),
641+
| (5, 200, null, '2023-10-01 10:20:00'),
642+
| (6, 403, '/home', null)
643+
| """.stripMargin)
644+
}
622645
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.flint.spark.ppl
6+
7+
import org.apache.spark.sql.{QueryTest, Row}
8+
import org.apache.spark.sql.streaming.StreamTest
9+
10+
class FlintSparkPPLFillnullITSuite
11+
extends QueryTest
12+
with LogicalPlanTestUtils
13+
with FlintPPLSuite
14+
with StreamTest {
15+
16+
private val testTable = "spark_catalog.default.flint_ppl_test"
17+
18+
override def beforeAll(): Unit = {
19+
super.beforeAll()
20+
21+
// Create test table
22+
createNullableTableHttpLog(testTable)
23+
}
24+
25+
protected override def afterEach(): Unit = {
26+
super.afterEach()
27+
// Stop all streaming jobs if any
28+
spark.streams.active.foreach { job =>
29+
job.stop()
30+
job.awaitTermination()
31+
}
32+
}
33+
34+
test("test fillnull with one null replacement value and one column") {
35+
val frame = sql(s"""
36+
| source = $testTable | fillnull value = 0 status_code
37+
| """.stripMargin)
38+
39+
val results: Array[Row] = frame.collect()
40+
val expectedResults: Array[Row] =
41+
Array(
42+
Row(1, "/home", null, 200),
43+
Row(2, "/about", "2023-10-01 10:05:00", 0),
44+
Row(3, "/contact", "2023-10-01 10:10:00", 0),
45+
Row(4, null, "2023-10-01 10:15:00", 301),
46+
Row(5, null, "2023-10-01 10:20:00", 200),
47+
Row(6, "/home", null, 403))
48+
// Compare the results
49+
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
50+
assert(results.sorted.sameElements(expectedResults.sorted))
51+
}
52+
53+
test("test fillnull with various null replacement values and one column") {
54+
val frame = sql(s"""
55+
| source = $testTable | fillnull fields status_code=101
56+
| """.stripMargin)
57+
58+
val results: Array[Row] = frame.collect()
59+
val expectedResults: Array[Row] =
60+
Array(
61+
Row(1, "/home", null, 200),
62+
Row(2, "/about", "2023-10-01 10:05:00", 101),
63+
Row(3, "/contact", "2023-10-01 10:10:00", 101),
64+
Row(4, null, "2023-10-01 10:15:00", 301),
65+
Row(5, null, "2023-10-01 10:20:00", 200),
66+
Row(6, "/home", null, 403))
67+
// Compare the results
68+
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
69+
assert(results.sorted.sameElements(expectedResults.sorted))
70+
}
71+
72+
test("test fillnull with one null replacement value and two columns") {
73+
val frame = sql(s"""
74+
| source = $testTable | fillnull value = '???' request_path, timestamp | fields id, request_path, timestamp
75+
| """.stripMargin)
76+
77+
val results: Array[Row] = frame.collect()
78+
val expectedResults: Array[Row] =
79+
Array(
80+
Row(1, "/home", "???"),
81+
Row(2, "/about", "2023-10-01 10:05:00"),
82+
Row(3, "/contact", "2023-10-01 10:10:00"),
83+
Row(4, "???", "2023-10-01 10:15:00"),
84+
Row(5, "???", "2023-10-01 10:20:00"),
85+
Row(6, "/home", "???"))
86+
// Compare the results
87+
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
88+
assert(results.sorted.sameElements(expectedResults.sorted))
89+
}
90+
91+
test("test fillnull with various null replacement values and two columns") {
92+
val frame = sql(s"""
93+
| source = $testTable | fillnull fields request_path='/not_found', timestamp='*' | fields id, request_path, timestamp
94+
| """.stripMargin)
95+
96+
val results: Array[Row] = frame.collect()
97+
val expectedResults: Array[Row] =
98+
Array(
99+
Row(1, "/home", "*"),
100+
Row(2, "/about", "2023-10-01 10:05:00"),
101+
Row(3, "/contact", "2023-10-01 10:10:00"),
102+
Row(4, "/not_found", "2023-10-01 10:15:00"),
103+
Row(5, "/not_found", "2023-10-01 10:20:00"),
104+
Row(6, "/home", "*"))
105+
// Compare the results
106+
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
107+
assert(results.sorted.sameElements(expectedResults.sorted))
108+
}
109+
}

ppl-spark-integration/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,11 @@ _- **Limitation: "REPLACE" or "APPEND" clause must contain "AS"**_
434434

435435
Details of Lookup command syntax, see [PPL-Lookup-Command](../docs/PPL-Lookup-command.md)
436436

437+
**Fillnull**
438+
- `source = table1 | fillnull value = 0 status_code`
439+
- `source = table1 | fillnull value = '???' request_path, timestamp | fields id, request_path, timestamp`
440+
- `source = table1 | fillnull fields status_code=101`
441+
- `source = table1 | fillnull fields request_path='/not_found', timestamp='*' | fields id, request_path, timestamp`
437442
---
438443
#### Experimental Commands:
439444
- `correlation` - [See details](../docs/PPL-Correlation-command.md)

ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,7 @@ keywordsCanBeId
956956
| KMEANS
957957
| AD
958958
| ML
959+
| EXPLAIN
959960
// commands assist keywords
960961
| SOURCE
961962
| INDEX
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.flint.spark.ppl
7+
8+
import org.opensearch.flint.spark.ppl.PlaneUtils.plan
9+
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
10+
import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq
11+
import org.scalatest.matchers.should.Matchers
12+
13+
import org.apache.spark.SparkFunSuite
14+
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
15+
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, NamedExpression}
16+
import org.apache.spark.sql.catalyst.plans.PlanTest
17+
import org.apache.spark.sql.catalyst.plans.logical.{DataFrameDropColumns, Project}
18+
19+
class PPLLogicalPlanRenameCommandTranslatorTestSuite
20+
extends SparkFunSuite
21+
with PlanTest
22+
with LogicalPlanTestUtils
23+
with Matchers {
24+
25+
private val planTransformer = new CatalystQueryPlanVisitor()
26+
private val pplParser = new PPLSyntaxParser()
27+
28+
test("test fillnull with one null replacement value and one column") {
29+
val context = new CatalystPlanContext
30+
val logPlan =
31+
planTransformer.visit(
32+
plan(
33+
pplParser,
34+
"source=relation | fillnull value = 'null replacement value' column_name"),
35+
context)
36+
37+
val relation = UnresolvedRelation(Seq("relation"))
38+
39+
val renameProjectList: Seq[NamedExpression] =
40+
Seq(
41+
UnresolvedStar(None),
42+
Alias(
43+
UnresolvedFunction(
44+
"coalesce",
45+
Seq(UnresolvedAttribute("column_name"), Literal("null replacement value")),
46+
isDistinct = false),
47+
"column_name")())
48+
val renameProject = Project(renameProjectList, relation)
49+
50+
val dropSourceColumn =
51+
DataFrameDropColumns(Seq(UnresolvedAttribute("column_name")), renameProject)
52+
53+
val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn)
54+
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
55+
}
56+
57+
test("test fillnull with one null replacement value and multiple column") {
58+
val context = new CatalystPlanContext
59+
val logPlan =
60+
planTransformer.visit(
61+
plan(
62+
pplParser,
63+
"source=relation | fillnull value = 'another null replacement value' column_name_one, column_name_two, column_name_three"),
64+
context)
65+
66+
val relation = UnresolvedRelation(Seq("relation"))
67+
68+
val renameProjectList: Seq[NamedExpression] = Seq(
69+
UnresolvedStar(None),
70+
Alias(
71+
UnresolvedFunction(
72+
"coalesce",
73+
Seq(UnresolvedAttribute("column_name_one"), Literal("another null replacement value")),
74+
isDistinct = false),
75+
"column_name_one")(),
76+
Alias(
77+
UnresolvedFunction(
78+
"coalesce",
79+
Seq(UnresolvedAttribute("column_name_two"), Literal("another null replacement value")),
80+
isDistinct = false),
81+
"column_name_two")(),
82+
Alias(
83+
UnresolvedFunction(
84+
"coalesce",
85+
Seq(
86+
UnresolvedAttribute("column_name_three"),
87+
Literal("another null replacement value")),
88+
isDistinct = false),
89+
"column_name_three")())
90+
val renameProject = Project(renameProjectList, relation)
91+
92+
val dropSourceColumn = DataFrameDropColumns(
93+
Seq(
94+
UnresolvedAttribute("column_name_one"),
95+
UnresolvedAttribute("column_name_two"),
96+
UnresolvedAttribute("column_name_three")),
97+
renameProject)
98+
99+
val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn)
100+
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
101+
}
102+
103+
test("test fillnull with possibly various null replacement value and one column") {
104+
val context = new CatalystPlanContext
105+
val logPlan =
106+
planTransformer.visit(
107+
plan(pplParser, "source=relation | fillnull fields column_name='null replacement value'"),
108+
context)
109+
110+
val relation = UnresolvedRelation(Seq("relation"))
111+
112+
val renameProjectList: Seq[NamedExpression] =
113+
Seq(
114+
UnresolvedStar(None),
115+
Alias(
116+
UnresolvedFunction(
117+
"coalesce",
118+
Seq(UnresolvedAttribute("column_name"), Literal("null replacement value")),
119+
isDistinct = false),
120+
"column_name")())
121+
val renameProject = Project(renameProjectList, relation)
122+
123+
val dropSourceColumn =
124+
DataFrameDropColumns(Seq(UnresolvedAttribute("column_name")), renameProject)
125+
126+
val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn)
127+
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
128+
}
129+
130+
test("test fillnull with possibly various null replacement value and three columns") {
131+
val context = new CatalystPlanContext
132+
val logPlan =
133+
planTransformer.visit(
134+
plan(
135+
pplParser,
136+
"source=relation | fillnull fields column_name_1='null replacement value 1', column_name_2='null replacement value 2', column_name_3='null replacement value 3'"),
137+
context)
138+
139+
val relation = UnresolvedRelation(Seq("relation"))
140+
141+
val renameProjectList: Seq[NamedExpression] = Seq(
142+
UnresolvedStar(None),
143+
Alias(
144+
UnresolvedFunction(
145+
"coalesce",
146+
Seq(UnresolvedAttribute("column_name_1"), Literal("null replacement value 1")),
147+
isDistinct = false),
148+
"column_name_1")(),
149+
Alias(
150+
UnresolvedFunction(
151+
"coalesce",
152+
Seq(UnresolvedAttribute("column_name_2"), Literal("null replacement value 2")),
153+
isDistinct = false),
154+
"column_name_2")(),
155+
Alias(
156+
UnresolvedFunction(
157+
"coalesce",
158+
Seq(UnresolvedAttribute("column_name_3"), Literal("null replacement value 3")),
159+
isDistinct = false),
160+
"column_name_3")())
161+
val renameProject = Project(renameProjectList, relation)
162+
163+
val dropSourceColumn = DataFrameDropColumns(
164+
Seq(
165+
UnresolvedAttribute("column_name_1"),
166+
UnresolvedAttribute("column_name_2"),
167+
UnresolvedAttribute("column_name_3")),
168+
renameProject)
169+
170+
val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn)
171+
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
172+
}
173+
}

0 commit comments

Comments
 (0)