@@ -156,42 +156,33 @@ trait CDCReaderImpl extends DeltaLogging {
156156 import org .apache .spark .sql .delta .commands .cdc .CDCReader ._
157157
158158 /**
159- * Given timestamp or version this method returns the corresponding version for that timestamp
160- * or the version itself.
159+ * Given timestamp or version, this method returns the corresponding version for that timestamp
160+ * or the version itself, as well as how the return version is obtained: by `version` or
161+ * `timestamp`.
161162 */
162- def getVersionForCDC (
163+ private def getVersionForCDC (
163164 spark : SparkSession ,
164165 deltaLog : DeltaLog ,
165166 conf : SQLConf ,
166167 options : CaseInsensitiveStringMap ,
167168 versionKey : String ,
168- timestampKey : String ): Option [Long ] = {
169+ timestampKey : String ): Option [ResolvedCDFVersion ] = {
169170 if (options.containsKey(versionKey)) {
170- Some (options.get(versionKey).toLong)
171+ Some (ResolvedCDFVersion ( options.get(versionKey).toLong, timestamp = None ) )
171172 } else if (options.containsKey(timestampKey)) {
172173 val ts = options.get(timestampKey)
173174 val spec = DeltaTimeTravelSpec (Some (Literal (ts)), None , Some (" cdcReader" ))
175+ val timestamp = spec.getTimestamp(spark.sessionState.conf)
174176 val allowOutOfRange = conf.getConf(DeltaSQLConf .DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP )
175- if (timestampKey == DeltaDataSource .CDC_START_TIMESTAMP_KEY ) {
177+ val resolvedVersion = if (timestampKey == DeltaDataSource .CDC_START_TIMESTAMP_KEY ) {
176178 // For the starting timestamp we need to find a version after the provided timestamp
177179 // we can use the same semantics as streaming.
178- val resolvedVersion = DeltaSource .getStartingVersionFromTimestamp(
179- spark,
180- deltaLog,
181- spec.getTimestamp(spark.sessionState.conf),
182- allowOutOfRange
183- )
184- Some (resolvedVersion)
180+ DeltaSource .getStartingVersionFromTimestamp(spark, deltaLog, timestamp, allowOutOfRange)
185181 } else {
186182 // For ending timestamp the version should be before the provided timestamp.
187- val resolvedVersion = DeltaTableUtils .resolveTimeTravelVersion(
188- conf,
189- deltaLog,
190- spec,
191- allowOutOfRange
192- )
193- Some (resolvedVersion._1)
183+ DeltaTableUtils .resolveTimeTravelVersion(conf, deltaLog, spec, allowOutOfRange)._1
194184 }
185+ Some (ResolvedCDFVersion (resolvedVersion, Some (timestamp)))
195186 } else {
196187 None
197188 }
@@ -240,10 +231,7 @@ trait CDCReaderImpl extends DeltaLogging {
240231 conf,
241232 options,
242233 DeltaDataSource .CDC_START_VERSION_KEY ,
243- DeltaDataSource .CDC_START_TIMESTAMP_KEY
244- )
245-
246- if (startingVersion.isEmpty) {
234+ DeltaDataSource .CDC_START_TIMESTAMP_KEY ).getOrElse {
247235 throw DeltaErrors .noStartVersionForCDC()
248236 }
249237
@@ -259,29 +247,30 @@ trait CDCReaderImpl extends DeltaLogging {
259247 s " cannot be used with time travel options. " )
260248 }
261249
250+ def emptyCDFRelation () = {
251+ new DeltaCDFRelation (
252+ SnapshotWithSchemaMode (snapshotToUse, schemaMode),
253+ spark.sqlContext,
254+ startingVersion = None ,
255+ endingVersion = None ) {
256+ override def buildScan (requiredColumns : Array [String ], filters : Array [Filter ]): RDD [Row ] =
257+ sqlContext.sparkSession.sparkContext.emptyRDD[Row ]
258+ }
259+ }
260+
262261 // add a version check here that is cheap instead of after trying to list a large version
263262 // that doesn't exist
264- if (startingVersion.get > snapshotToUse.version) {
263+ if (startingVersion.version > snapshotToUse.version) {
265264 val allowOutOfRange = conf.getConf(DeltaSQLConf .DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP )
266265 // LS-129: return an empty relation if start version passed in is beyond latest commit version
267266 if (allowOutOfRange) {
268- return new DeltaCDFRelation (
269- SnapshotWithSchemaMode (snapshotToUse, schemaMode),
270- spark.sqlContext,
271- None ,
272- None ) {
273- override def buildScan (
274- requiredColumns : Array [String ],
275- filters : Array [Filter ]): RDD [Row ] = {
276- sqlContext.sparkSession.sparkContext.emptyRDD[Row ]
277- }
278- }
267+ return emptyCDFRelation()
279268 }
280269 throw DeltaErrors .startVersionAfterLatestVersion(
281- startingVersion.get , snapshotToUse.version)
270+ startingVersion.version , snapshotToUse.version)
282271 }
283272
284- val endingVersion = getVersionForCDC(
273+ val endingVersionOpt = getVersionForCDC(
285274 spark,
286275 snapshotToUse.deltaLog,
287276 conf,
@@ -290,17 +279,42 @@ trait CDCReaderImpl extends DeltaLogging {
290279 DeltaDataSource .CDC_END_TIMESTAMP_KEY
291280 )
292281
293- if (endingVersion.exists(_ < startingVersion.get)) {
294- throw DeltaErrors .endBeforeStartVersionInCDC(startingVersion.get, endingVersion.get)
282+ // Given two timestamps, there is a case when both of them lay closely between two versions:
283+ // version: 4 5
284+ // ---------|-------------------------------------------------|--------
285+ // ^ start timestamp ^ end timestamp
286+ // In this case the starting version will be 5 and ending version will be 4. We must not
287+ // throw `endBeforeStartVersionInCDC` but return empty result.
288+ endingVersionOpt.foreach { endingVersion =>
289+ if (startingVersion.resolvedByTimestamp && endingVersion.resolvedByTimestamp) {
290+ // The next `if` is true when end is less than start but no commit is in between.
291+ // We need to capture such a case and throw early.
292+ if (startingVersion.timestamp.get.after(endingVersion.timestamp.get)) {
293+ throw DeltaErrors .endBeforeStartVersionInCDC(
294+ startingVersion.version,
295+ endingVersion.version)
296+ }
297+ if (endingVersion.version == startingVersion.version - 1 ) {
298+ return emptyCDFRelation()
299+ }
300+ }
301+ }
302+
303+ if (endingVersionOpt.exists(_.version < startingVersion.version)) {
304+ throw DeltaErrors .endBeforeStartVersionInCDC(
305+ startingVersion.version,
306+ endingVersionOpt.get.version)
295307 }
296308
297- logInfo(s " startingVersion: $startingVersion, endingVersion: $endingVersion" )
309+ logInfo(
310+ s " startingVersion: ${startingVersion.version}, " +
311+ s " endingVersion: ${endingVersionOpt.map(_.version)}" )
298312
299313 DeltaCDFRelation (
300314 SnapshotWithSchemaMode (snapshotToUse, schemaMode),
301315 spark.sqlContext,
302- startingVersion,
303- endingVersion )
316+ Some ( startingVersion.version) ,
317+ endingVersionOpt.map(_.version) )
304318 }
305319
306320 /**
@@ -720,4 +734,15 @@ trait CDCReaderImpl extends DeltaLogging {
720734 * @param numBytes the total size of the AddFile + RemoveFile + AddCDCFiles that are in the df
721735 */
722736 case class CDCVersionDiffInfo (fileChangeDf : DataFrame , numFiles : Long , numBytes : Long )
737+
738+ /**
739+ * Represents a Delta log version, and how the version is determined.
740+ * @param version the determined version.
741+ * @param timestamp the commit timestamp of the determined version. Will be filled when the
742+ * version is determined by timestamp.
743+ */
744+ private case class ResolvedCDFVersion (version : Long , timestamp : Option [Timestamp ]) {
745+ /** Whether this version is resolved by timestamp. */
746+ def resolvedByTimestamp : Boolean = timestamp.isDefined
747+ }
723748}
0 commit comments