Skip to content

Commit d426489

Browse files
Eric Xiaogaoyunhaii
authored andcommitted
[FLINK-29498][datastream] Add Scala Async Retry Strategies and ResultPredicates Helper Classes
This closes apache#21077.
1 parent 5b8ea81 commit d426489

File tree

6 files changed

+194
-38
lines changed

6 files changed

+194
-38
lines changed

docs/content.zh/docs/dev/datastream/operators/asyncio.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ DataStream<Tuple2<String, String>> resultStream =
125125
// 通过工具类创建一个异步重试策略, 或用户实现自定义的策略
126126
AsyncRetryStrategy asyncRetryStrategy =
127127
new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
128-
.retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
129-
.retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
128+
.ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
129+
.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
130130
.build();
131131

132132
// 应用异步 I/O 转换操作并启用重试
@@ -170,7 +170,11 @@ val resultStream: DataStream[(String, String)] =
170170

171171
// 或 应用异步 I/O 转换操作并启用重试
172172
// 创建一个异步重试策略
173-
val asyncRetryStrategy: AsyncRetryStrategy[OUT] = ...
173+
val asyncRetryStrategy: AsyncRetryStrategy[String] =
174+
new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
175+
.ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
176+
.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
177+
.build();
174178

175179
// 应用异步 I/O 转换操作并启用重试
176180
val resultStream: DataStream[(String, String)] =

docs/content/docs/dev/datastream/operators/asyncio.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ DataStream<Tuple2<String, String>> resultStream =
140140
// create an async retry strategy via utility class or a user defined strategy
141141
AsyncRetryStrategy asyncRetryStrategy =
142142
new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
143-
.retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
144-
.retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
143+
.ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
144+
.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
145145
.build();
146146

147147
// apply the async I/O transformation with retry
@@ -185,7 +185,11 @@ val resultStream: DataStream[(String, String)] =
185185

186186
// apply the async I/O transformation with retry
187187
// create an AsyncRetryStrategy
188-
val asyncRetryStrategy: AsyncRetryStrategy[OUT] = ...
188+
val asyncRetryStrategy: AsyncRetryStrategy[String] =
189+
new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
190+
.ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
191+
.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
192+
.build();
189193

190194
// apply the async I/O transformation with retry
191195
val resultStream: DataStream[(String, String)] =

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
430430
/**
431431
* A guard similar to ResultHandler#complete to prevent repeated complete calls from
432432
* ill-written AsyncFunction. This flag indicates a retry is in-flight, new retry will be
433-
* rejected if it is ture, and it will be reset to false after the retry fired.
433+
* rejected if it is true, and it will be reset to false after the retry fired.
434434
*/
435435
private final AtomicBoolean retryAwaiting = new AtomicBoolean(false);
436436

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.streaming.api.scala.async
19+
20+
import org.apache.flink.annotation.PublicEvolving
21+
import org.apache.flink.streaming.api.functions.async
22+
import org.apache.flink.streaming.api.functions.async.{AsyncRetryStrategy => JAsyncRetryStrategy}
23+
import org.apache.flink.streaming.util.retryable.{AsyncRetryStrategies => JAsyncRetryStrategies}
24+
25+
import java.{util => ju}
26+
import java.util.function.Predicate
27+
28+
/** Utility class to create concrete {@link AsyncRetryStrategy}. */
29+
object AsyncRetryStrategies {
30+
31+
final private class JavaToScalaRetryStrategy[T](retryStrategy: JAsyncRetryStrategy[T])
32+
extends AsyncRetryStrategy[T] {
33+
34+
/** @return whether the next attempt can happen */
35+
override def canRetry(currentAttempts: Int): Boolean = retryStrategy.canRetry(currentAttempts)
36+
37+
/** @return the delay time of next attempt */
38+
override def getBackoffTimeMillis(currentAttempts: Int): Long =
39+
retryStrategy.getBackoffTimeMillis(currentAttempts)
40+
41+
/** @return the defined retry predicate {@link AsyncRetryPredicate} */
42+
override def getRetryPredicate(): AsyncRetryPredicate[T] = new AsyncRetryPredicate[T] {
43+
val retryPredicates: async.AsyncRetryPredicate[T] = retryStrategy.getRetryPredicate
44+
45+
/**
46+
* An Optional Java {@Predicate } that defines a condition on asyncFunction's future result
47+
* which will trigger a later reattempt operation, will be called before user's
48+
* ResultFuture#complete.
49+
*
50+
* @return
51+
* predicate on result of {@link ju.Collection}
52+
*/
53+
override def resultPredicate: Option[Predicate[ju.Collection[T]]] = Option(
54+
retryPredicates.resultPredicate.orElse(null))
55+
56+
/**
57+
* An Optional Java {@Predicate } that defines a condition on asyncFunction's exception which
58+
* will trigger a later reattempt operation, will be called before user's
59+
* ResultFuture#completeExceptionally.
60+
*
61+
* @return
62+
* predicate on {@link Throwable} exception
63+
*/
64+
override def exceptionPredicate: Option[Predicate[Throwable]] = Option(
65+
retryPredicates.exceptionPredicate.orElse(null))
66+
}
67+
}
68+
69+
/**
70+
* FixedDelayRetryStrategyBuilder for building an {@link AsyncRetryStrategy} with fixed delay
71+
* retrying behaviours.
72+
*/
73+
@PublicEvolving
74+
@SerialVersionUID(1L)
75+
class FixedDelayRetryStrategyBuilder[OUT](
76+
private val maxAttempts: Int,
77+
private val backoffTimeMillis: Long
78+
) {
79+
private var builder =
80+
new JAsyncRetryStrategies.FixedDelayRetryStrategyBuilder[OUT](maxAttempts, backoffTimeMillis)
81+
82+
def ifResult(resultRetryPredicate: Predicate[ju.Collection[OUT]])
83+
: FixedDelayRetryStrategyBuilder[OUT] = {
84+
this.builder = this.builder.ifResult(resultRetryPredicate)
85+
this
86+
}
87+
88+
def ifException(
89+
exceptionRetryPredicate: Predicate[Throwable]): FixedDelayRetryStrategyBuilder[OUT] = {
90+
this.builder = this.builder.ifException(exceptionRetryPredicate)
91+
this
92+
}
93+
94+
def build(): AsyncRetryStrategy[OUT] = new JavaToScalaRetryStrategy[OUT](builder.build())
95+
}
96+
97+
/**
98+
* ExponentialBackoffDelayRetryStrategyBuilder for building an {@link AsyncRetryStrategy} with
99+
* exponential delay retrying behaviours.
100+
*/
101+
@PublicEvolving
102+
@SerialVersionUID(1L)
103+
class ExponentialBackoffDelayRetryStrategyBuilder[OUT](
104+
private val maxAttempts: Int,
105+
private val initialDelay: Long,
106+
private val maxRetryDelay: Long,
107+
private val multiplier: Double
108+
) {
109+
private var builder =
110+
new JAsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder[OUT](
111+
maxAttempts,
112+
initialDelay,
113+
maxRetryDelay,
114+
multiplier)
115+
116+
def ifResult(resultRetryPredicate: Predicate[ju.Collection[OUT]])
117+
: ExponentialBackoffDelayRetryStrategyBuilder[OUT] = {
118+
this.builder = this.builder.ifResult(resultRetryPredicate)
119+
this
120+
}
121+
122+
def ifException(exceptionRetryPredicate: Predicate[Throwable])
123+
: ExponentialBackoffDelayRetryStrategyBuilder[OUT] = {
124+
this.builder = this.builder.ifException(exceptionRetryPredicate)
125+
this
126+
}
127+
128+
def build(): AsyncRetryStrategy[OUT] = new JavaToScalaRetryStrategy[OUT](builder.build())
129+
}
130+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.streaming.api.scala.async
19+
20+
import org.apache.flink.annotation.PublicEvolving
21+
import org.apache.flink.streaming.util.retryable.{RetryPredicates => JRetryPredicates}
22+
23+
import java.util
24+
import java.util.function.Predicate
25+
26+
/** Utility class to create concrete retry predicates. */
27+
@PublicEvolving
28+
object RetryPredicates {
29+
30+
/** A predicate matches empty result which means an empty {@link Collection}. */
31+
def EMPTY_RESULT_PREDICATE[T]: Predicate[util.Collection[T]] =
32+
JRetryPredicates.EMPTY_RESULT_PREDICATE.asInstanceOf[Predicate[util.Collection[T]]]
33+
34+
/** A predicate matches any exception which means a non-null{@link Throwable}. */
35+
def HAS_EXCEPTION_PREDICATE: Predicate[Throwable] =
36+
JRetryPredicates.HAS_EXCEPTION_PREDICATE.asInstanceOf[Predicate[Throwable]]
37+
38+
}

flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala
2020
import org.apache.flink.configuration.Configuration
2121
import org.apache.flink.streaming.api.functions.sink.SinkFunction
2222
import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
23-
import org.apache.flink.streaming.api.scala.async.{AsyncRetryPredicate, AsyncRetryStrategy, ResultFuture, RichAsyncFunction}
23+
import org.apache.flink.streaming.api.scala.async.{AsyncRetryStrategies, ResultFuture, RetryPredicates, RichAsyncFunction}
2424
import org.apache.flink.test.util.AbstractTestBase
2525

2626
import org.junit.Assert._
@@ -31,7 +31,6 @@ import org.junit.runners.Parameterized.Parameters
3131

3232
import java.{util => ju}
3333
import java.util.concurrent.{CountDownLatch, TimeUnit}
34-
import java.util.function.Predicate
3534

3635
import scala.collection.mutable
3736
import scala.concurrent.{ExecutionContext, Future}
@@ -174,7 +173,11 @@ class AsyncDataStreamITCase(ordered: Boolean) extends AbstractTestBase {
174173

175174
val asyncFunction = new OddInputReturnEmptyAsyncFunc
176175

177-
val asyncRetryStrategy = createFixedRetryStrategy[Int](3, 10)
176+
val asyncRetryStrategy =
177+
new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 10)
178+
.ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE[Int])
179+
.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
180+
.build()
178181

179182
val timeout = 10000L
180183
val asyncMapped = if (ordered) {
@@ -196,33 +199,6 @@ class AsyncDataStreamITCase(ordered: Boolean) extends AbstractTestBase {
196199
executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 4, 6))
197200
}
198201

199-
private def createFixedRetryStrategy[OUT](
200-
maxAttempts: Int,
201-
fixedDelayMs: Long): AsyncRetryStrategy[OUT] = {
202-
new AsyncRetryStrategy[OUT] {
203-
204-
override def canRetry(currentAttempts: Int): Boolean = {
205-
currentAttempts <= maxAttempts
206-
}
207-
208-
override def getBackoffTimeMillis(currentAttempts: Int): Long = fixedDelayMs
209-
210-
override def getRetryPredicate(): AsyncRetryPredicate[OUT] = {
211-
new AsyncRetryPredicate[OUT] {
212-
override def resultPredicate: Option[Predicate[ju.Collection[OUT]]] = {
213-
Option(
214-
new Predicate[ju.Collection[OUT]] {
215-
override def test(t: ju.Collection[OUT]): Boolean = t.isEmpty
216-
}
217-
)
218-
}
219-
220-
override def exceptionPredicate: Option[Predicate[Throwable]] = Option.empty
221-
}
222-
}
223-
}
224-
}
225-
226202
@Test
227203
def testAsyncWaitWithRetryUsingAnonymousFunction(): Unit = {
228204
val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -245,7 +221,8 @@ class AsyncDataStreamITCase(ordered: Boolean) extends AbstractTestBase {
245221
}
246222

247223
val timeout = 10000L
248-
val asyncRetryStrategy = createFixedRetryStrategy[Int](3, 10)
224+
val asyncRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder[Int](3, 10)
225+
.build()
249226

250227
val asyncMapped = if (ordered) {
251228
AsyncDataStream.orderedWaitWithRetry(
@@ -283,6 +260,7 @@ class AsyncFunctionWithTimeoutExpired extends RichAsyncFunction[Int, Int] {
283260
resultFuture.complete(Seq(input * 2))
284261
}(ExecutionContext.global)
285262
}
263+
286264
override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
287265
resultFuture.complete(Seq(input * 3))
288266
invokeLatch.countDown()
@@ -307,6 +285,7 @@ class AsyncFunctionWithoutTimeoutExpired extends RichAsyncFunction[Int, Int] {
307285
timeoutLatch.countDown()
308286
}(ExecutionContext.global)
309287
}
288+
310289
override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
311290
// this sleeping helps reproducing race condition with cancellation
312291
Thread.sleep(10)
@@ -326,6 +305,7 @@ class MyRichAsyncFunction extends RichAsyncFunction[Int, Int] {
326305
resultFuture.complete(Seq(input * 2))
327306
}(ExecutionContext.global)
328307
}
308+
329309
override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
330310
resultFuture.complete(Seq(input * 3))
331311
}

0 commit comments

Comments
 (0)