En el trabajo actual, escribimos en Reactor. La tecnología es genial, pero como siempre hay muchos PERO. Algunas cosas son molestas, el código es más difícil de escribir y leer y ThreadLocal es un verdadero desastre. Decidí ver qué problemas desaparecerán si cambia a Kotlin Coroutines, y qué problemas, por el contrario, se agregarán.
Tarjeta de paciente
Escribí un pequeño proyecto para el artículo , reproduciendo los problemas que encontré en el trabajo. El código principal está aquí . El algoritmo no lo dividió deliberadamente en métodos separados, por lo que los problemas se ven mejor.
En pocas palabras sobre el algoritmo:
Transferimos dinero de una cuenta a otra, registrando transacciones sobre el hecho de la transferencia.
La traducción es idempotente, por lo que si la transacción ya está en la base de datos, le respondemos al cliente que todo está bien. Al insertar una transacción, se puede lanzar una excepción DataIntegrityViolationException, esto también significa que la transacción ya existe.
Para no ir en negativo, hay una verificación en el código de bloqueo + Optimistic, que no permite la actualización competitiva de las cuentas. Necesita reintentar y manejo de errores adicional para que funcione.
Para aquellos a los que no les gusta el algoritmo en sí
El algoritmo del proyecto se eligió para reproducir los problemas, no para ser eficiente y arquitectónicamente correcto. En lugar de una transacción, debe insertar semiconductores, no se necesita un bloqueo optimista (en su lugar, una verificación de la positividad de una cuenta en sql), seleccionar + insertar debe reemplazarse con upsert.
Quejas de los pacientes
Stacktrace .
, .
- flatMap.
.
Mono.empty().
, - , traceId. ( , ThreadLocal , SpringSecurity)
.
api .
PR Java Kotlin.
.
com.fasterxml.jackson.module:jackson-module-kotlin data org.jetbrains.kotlin.plugin.spring open .
suspend fun transfer(@RequestBody request: TransferRequest)
public Mono<Void> transfer(@RequestBody TransferRequest request)
suspend fun save(account: Account): Account
Mono<Account> save(Account account);
, , suspend , , Reactor .
runBlocking { … }
, suspend .
Retry kotlin-retry. , , ( PR).
, , . -.
:
public Mono<Void> transfer(String transactionKey, long fromAccountId,
long toAccountId, BigDecimal amount) {
return transactionRepository.findByUniqueKey(transactionKey)
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(withMDC(foundTransaction -> {
if (foundTransaction.isPresent()) {
log.warn("retry of transaction " + transactionKey);
return Mono.empty();
}
return accountRepository.findById(fromAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound()))
.flatMap(fromAccount -> accountRepository.findById(toAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound()))
.flatMap(toAccount -> {
var transactionToInsert = Transaction.builder()
.amount(amount)
.fromAccountId(fromAccountId)
.toAccountId(toAccountId)
.uniqueKey(transactionKey)
.build();
var amountAfter = fromAccount.getAmount().subtract(amount);
if (amountAfter.compareTo(BigDecimal.ZERO) < 0) {
return Mono.error(new NotEnoghtMoney());
}
return transactionalOperator.transactional(
transactionRepository.save(transactionToInsert)
.onErrorResume(error -> {
//transaction was inserted on parallel transaction,
//we may return success response
if (error instanceof DataIntegrityViolationException
&& error.getMessage().contains("TRANSACTION_UNIQUE_KEY")) {
return Mono.empty();
} else {
return Mono.error(error);
}
})
.then(accountRepository.transferAmount(
fromAccount.getId(), fromAccount.getVersion(),
amount.negate()
))
.then(accountRepository.transferAmount(
toAccount.getId(), toAccount.getVersion(), amount
))
);
}));
}))
.retryWhen(Retry.backoff(3, Duration.ofMillis(1))
.filter(OptimisticLockException.class::isInstance)
.onRetryExhaustedThrow((__, retrySignal) -> retrySignal.failure())
)
.onErrorMap(
OptimisticLockException.class,
e -> new ResponseStatusException(
BANDWIDTH_LIMIT_EXCEEDED,
"limit of OptimisticLockException exceeded", e
)
)
.onErrorResume(withMDC(e -> {
log.error("error on transfer", e);
return Mono.error(e);
}));
}
:
suspend fun transfer(transactionKey: String, fromAccountId: Long,
toAccountId: Long, amount: BigDecimal) {
try {
try {
retry(limitAttempts(3) + filter { it is OptimisticLockException }) {
val foundTransaction = transactionRepository
.findByUniqueKey(transactionKey)
if (foundTransaction != null) {
logger.warn("retry of transaction $transactionKey")
return@retry
}
val fromAccount = accountRepository.findById(fromAccountId)
?: throw AccountNotFound()
val toAccount = accountRepository.findById(toAccountId)
?: throw AccountNotFound()
if (fromAccount.amount - amount < BigDecimal.ZERO) {
throw NotEnoghtMoney()
}
val transactionToInsert = Transaction(
amount = amount,
fromAccountId = fromAccountId,
toAccountId = toAccountId,
uniqueKey = transactionKey
)
transactionalOperator.executeAndAwait {
try {
transactionRepository.save(transactionToInsert)
} catch (e: DataIntegrityViolationException) {
if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {
throw e;
}
}
accountRepository.transferAmount(
fromAccount.id!!, fromAccount.version, amount.negate()
)
accountRepository.transferAmount(
toAccount.id!!, toAccount.version, amount
)
}
}
} catch (e: OptimisticLockException) {
throw ResponseStatusException(
BANDWIDTH_LIMIT_EXCEEDED,
"limit of OptimisticLockException exceeded", e
)
}
} catch (e: Exception) {
logger.error(e) { "error on transfer" }
throw e;
}
}
Stacktraces
, .
:
o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockException
at c.g.c.v.r.services.Ledger.lambda$transfer$5(Ledger.java:75)
...
Caused by: c.g.c.v.r.OptimisticLockException: null
at c.g.c.v.r.repos.AccountRepositoryImpl.lambda$transferAmount$0(AccountRepositoryImpl.java:27)
at r.c.p.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
...
:
error on transfer o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockException
at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:70)
at c.g.c.v.r.services.Ledger$transfer$1.invokeSuspend(Ledger.kt)
...
Caused by: c.g.c.v.r.OptimisticLockException: null
at c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)
...
at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)
at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)
at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)
(Coroutine boundary)
at o.s.t.r.TransactionalOperatorExtensionsKt.executeAndAwait(TransactionalOperatorExtensions.kt:31)
at c.g.c.v.r.services.Ledger$transfer$3.invokeSuspend(Ledger.kt:56)
at com.github.michaelbull.retry.RetryKt$retry$3.invokeSuspend(Retry.kt:38)
at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:35)
at c.g.c.v.r.controllers.LedgerController$transfer$2$1.invokeSuspend(LedgerController.kt:20)
at c.g.c.v.r.controllers.LedgerController$transfer$2.invokeSuspend(LedgerController.kt:19)
at kotlin.reflect.full.KCallables.callSuspend(KCallables.kt:55)
at o.s.c.CoroutinesUtils$invokeSuspendingFunction$mono$1.invokeSuspend(CoroutinesUtils.kt:64)
(Coroutine creation stacktrace)
at k.c.i.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:122)
at k.c.i.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
...
Caused by: c.g.c.v.r.OptimisticLockException: null
at c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)
...
at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)
at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)
at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)
...
, ( , ).
Java . , . . . Kotlin .
, - . ? . , - traceId (thread name ) .
Kotlin , , . ( : ).
flatMap. - try catch, .
:
return accountRepository.findById(fromAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound()))
.flatMap(fromAccount -> accountRepository.findById(toAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound()))
.flatMap(toAccount -> {
...
})
:
val fromAccount = accountRepository.findById(fromAccountId)
?: throw AccountNotFound()
val toAccount = accountRepository.findById(toAccountId)
?: throw AccountNotFound()
...
try catch, .
:
return transactionRepository.findByUniqueKey(transactionKey)
...
.onErrorMap(
OptimisticLockException.class,
e -> new ResponseStatusException(
BANDWIDTH_LIMIT_EXCEEDED,
"limit of OptimisticLockException exceeded", e
)
)
:
try {
val foundTransaction = transactionRepository
.findByUniqueKey(transactionKey)
...
} catch (e: OptimisticLockException) {
throw ResponseStatusException(
BANDWIDTH_LIMIT_EXCEEDED,
"limit of OptimisticLockException exceeded", e
)
}
throw, . Reactor :
.flatMap(foo -> {
if (foo.isEmpty()) {
return Mono.error(new IllegalStateException());
} else {
return Mono.just(foo);
}
})
, , . - .
Mono.empty()
. null . ¨C5C.
Ide , mono . . , - .
Kotlin not null , , . nullable - .
:
:
return transactionRepository.findByUniqueKey(transactionKey)
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(foundTransaction -> {
if (foundTransaction.isPresent()) {
log.warn("retry of transaction " + transactionKey);
return Mono.empty();
}
...
:
val foundTransaction = transactionRepository
.findByUniqueKey(transactionKey)
if (foundTransaction != null) {
logger.warn("retry of transaction $transactionKey")
return@retry
}
...
, - Reactor, .
, traceId . ThreadLocal , MDC ( ). ?
. Reactor Coroutines immutable, MDC ( ).
Java , traceId :
@Component
public class TraceIdFilter implements WebFilter {
@Override
public Mono<Void> filter(
ServerWebExchange exchange, WebFilterChain chain
) {
var traceId = Optional.ofNullable(
exchange.getRequest().getHeaders().get("X-B3-TRACEID")
)
.orElse(Collections.emptyList())
.stream().findAny().orElse(UUID.randomUUID().toString());
return chain.filter(exchange)
.contextWrite(context ->
LoggerHelper.addEntryToMDCContext(context, "traceId", traceId)
);
}
}
, - , traceId MDC:
public static <T, R> Function<T, Mono<R>> withMDC(
Function<T, Mono<R>> block
) {
return value -> Mono.deferContextual(context -> {
Optional<Map<String, String>> mdcContext = context
.getOrEmpty(MDC_ID_KEY);
if (mdcContext.isPresent()) {
try {
MDC.setContextMap(mdcContext.get());
return block.apply(value);
} finally {
MDC.clear();
}
} else {
return block.apply(value);
}
});
}
, Mono. .. , Mono. :
.onErrorResume(withMDC(e -> {
log.error("error on transfer", e);
return Mono.error(e);
}))
Kotlin . , traceId MDC:
@Component
class TraceIdFilter : WebFilter {
override fun filter(
exchange: ServerWebExchange, chain: WebFilterChain
): Mono<Void> {
val traceId = exchange.request.headers["X-B3-TRACEID"]?.first()
MDC.put("traceId", traceId ?: UUID.randomUUID().toString())
return chain.filter(exchange)
}
}
withContext(MDCContext()) { … }
, MDC traceId. .
Java Reactor , : , , breakpoints ...
: stepOver, , ( ).
, suspend . issue. , , Java Reactor evaluate , .
, , .
:
return Mono.zip(
transactionRepository.findByUniqueKey(transactionKey)
.map(Optional::of)
.defaultIfEmpty(Optional.empty()),
accountRepository.findById(fromAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound())),
accountRepository.findById(toAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound())),
).flatMap(withMDC(fetched -> {
var foundTransaction = fetched.getT1();
var fromAccount = fetched.getT2();
var toAccount = fetched.getT3();
if (foundTransaction.isPresent()) {
log.warn("retry of transaction " + transactionKey);
return Mono.empty();
}
...
}
:
coroutineScope {
val foundTransactionAsync = async {
logger.info("async fetch of transaction $transactionKey")
transactionRepository.findByUniqueKey(transactionKey)
}
val fromAccountAsync = async {
accountRepository.findById(fromAccountId)
}
val toAccountAsync = async {
accountRepository.findById(toAccountId)
}
if (foundTransactionAsync.await() != null) {
logger.warn("retry of transaction $transactionKey")
return@retry
}
val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()
val toAccount = toAccountAsync.await() ?: throw AccountNotFound()
...
}
Kotlin “ ”, “ ” Reactor.
, -. Reactor , . - foundTransactionAsync.await(). , transactionRepository.findByUniqueKey() , , accountRepository.findById() ( ).
. , Reactor :
coroutineScope {
val foundTransactionAsync = async {
logger.info("async fetch of transaction $transactionKey")
transactionRepository.findByUniqueKey(transactionKey)
}
val fromAccountAsync = async {
accountRepository.findById(fromAccountId)
}
val toAccountAsync = async {
accountRepository.findById(toAccountId)
}
if (foundTransactionAsync.await() != null) {
logger.warn("retry of transaction $transactionKey")
return@retry
}
val transactionToInsert = Transaction(
amount = amount,
fromAccountId = fromAccountId,
toAccountId = toAccountId,
uniqueKey = transactionKey
)
transactionalOperator.executeAndAwait {
try {
transactionRepository.save(transactionToInsert)
} catch (e: DataIntegrityViolationException) {
if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {
throw e;
}
}
val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()
val toAccount = toAccountAsync.await() ?: throw AccountNotFound()
if (fromAccount.amount - amount < BigDecimal.ZERO) {
throw NotEnoghtMoney()
}
accountRepository.transferAmount(
fromAccount.id!!, fromAccount.version, amount.negate()
)
accountRepository.transferAmount(
toAccount.id!!, toAccount.version, amount
)
}
}
. .. , . , , ( ).
, , .
context scope
, :
scope. , , .
context. .
Spring , :
@PutMapping("/transfer")
suspend fun transfer(@RequestBody request: TransferRequest) {
coroutineScope {
withContext(MDCContext()) {
ledger.transfer(request.transactionKey, request.fromAccountId,
request.toAccountId, request.amount)
}
}
}
, regexp , . - .
AOP suspend
, , . aspect suspend .
Finalmente logré escribir ese aspecto. Pero para explicar cómo funciona esto, necesita un artículo aparte.
Espero que haya una forma más adecuada de escribir aspectos (intentaré contribuir a esto).
Evaluación del tratamiento
Todos los problemas desaparecieron. Se agregaron un par de nuevos, pero es tolerable.
Debo decir que las corrutinas se están desarrollando rápidamente y solo espero trabajar mejor con ellas.
Se puede ver que el equipo de JetBrains estuvo atento a los problemas de los desarrolladores. Hasta donde yo sé, hace aproximadamente un año todavía había problemas con la depuración y las carreras de stact, por ejemplo.
Lo más importante es que con las corrutinas, no es necesario que tenga en cuenta todas las características de Reactor y su poderosa API. Solo escribe el código.