本文我们将介绍如何使用 Kotlin Flow/数据流时实现重试操作,并通过扩展函数实现可重用的重试策略。
介绍
创建数据流时如果发生异常,可能需要重试某些操作,如查询本地数据库数据或网络请求。
Kotlin 的 Flow/数据流 提供了两个可扩展函数来实现重试: retryWhen()
, retry()
。
当上游数据流中发生与给定的谓词函数匹配的异常时,重试给定数据流的收集,并且可指定最多重试的次数。
先来详细介绍下这两个方法:
retryWhen()
如下是 retryWhen
函数的源代码:
public fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> =
flow {
var attempt = 0L
var shallRetry: Boolean
do {
shallRetry = false
val cause = catchImpl(this)
if (cause != null) {
if (predicate(cause, attempt)) {
shallRetry = true
attempt++
} else {
throw cause
}
}
} while (shallRetry)
}
retryWhen
函数有 2 个参数:
-
cause
它的类型为 Throwable
,它是所有异常类的父类,表示创建数据流时失败的原因。
-
attempt
类型为 Long
,为失败的尝试次数。
如果谓词仅返回 true,那么它将重试数据流的收集,否则它将退出流程。
示例:
以一次网络请求的重试为例,当抛出 IO 异常时重试,且最多重试3次
package cn.itmob.sample
suspend fun fetchMessage(msgId: String): Flow<Result<Repos>> {
return flow {
emit(Result.success(api.fetchMessage(msgId))
}.retryWhen { cause, attempt ->
// 如果抛出 IO 异常, 则重试3次
if (cause is IOException && attempt < 3) {
return@retryWhen true
} else {
return@retryWhen false
}
}
}
retry()
如下是 retry
函数的源代码:
public fun <T> Flow<T>.retry(
retries: Long = Long.MAX_VALUE,
predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
require(retries > 0) { "Expected positive amount of retries, but had $retries" }
return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
}
retry 函数有 2 个参数:
-
retries
类型为 Long
,为数据流上游发送异常后最多的重试次数。
-
predicate
谓词函数,判断当上游数据流中发生的异常是否需要重试。
注意:
1. 如果没有指定重试次数,则取 Long.MAX_VALUE
作为重试次数。
- 如果没有指定谓词函数,则默认返回 true。
如果没有指定这两个参数,发生异常时可能会引起不断的重试,直到成功。
示例:
也是以一次网络请求的重试为例,当抛出 IO 异常时重试,且最多重试3次
package cn.itmob.sample
suspend fun fetchMessage(msgId: String): Flow<Result<Repos>> {
return flow {
emit(Result.success(api.fetchMessage(msgId))
}.retry(retries = 3) {cause ->
if(cause is IOException) {
return@retry true
} else return@retry false
}
}
总结
上文是 Flow/数据流 重试的两个 API 的介绍,但是当我们的应用中多次用到它时,比如多个网络访问的代码都需要相同的重试策略时,可以自定义一个对我们自己的项目来说更通用的扩展函数,来保证多个重试都使用相同的策略:
package cn.itmob.example
/**
* 自定义通用的重试策略,发生 IOException 时最多重试3次,每次延时1秒重试
*/
fun <T> Flow<T>.retry(
attemptNum: Long = 3,
delayMillis: Long = 1000,
): Flow<T> {
return retryWhen { cause, attempt ->
if (cause is IOException && attempt < attemptNum) {
delay(delayMillis)
return@retryWhen true
} else {
return@retryWhen false
}
}
}
更多相关文章:
retry() 官方文档
retryWhen() 官方文档
Kotlin callbackFlow - 将基于回调机制的 API 转换为 Flow/数据流 介绍