REACTOR de overclocking

¿A quién le interesará?

El reactor de hoy es elegante, moderno y juvenil. ¿Por qué tantos de nosotros practicamos la programación reactiva? Pocos pueden responder a esta pregunta de manera inequívoca. Bien, si comprende su ganancia, mal, si la organización impone el reactor como un hecho. La mayoría de los argumentos "FOR" son el uso de una arquitectura de microservicio, que a su vez obliga a los microservicios a comunicarse con frecuencia y mucho entre sí. Para la comunicación, en la mayoría de los casos, se elige la interacción HTTP. HTTP necesita un servidor web ligero, ¿qué le viene a la mente primero? Gato. Aquí hay problemas con el límite en el número máximo de sesiones, luego de sobrepasar el cual el servidor web comienza a rechazar solicitudes (aunque este límite no es tan fácil de lograr). Aquí viene el reactor al rescate, que no está limitado por tales límites, y, por ejemplo,Netty como servidor web que funciona con reactividad desde el primer momento. Dado que hay un servidor web reactivo, necesita un cliente web reactivo (Spring WebClient o Reactive Feign), y dado que el cliente es reactivo, entonces todo este horror se filtra en la lógica empresarial, Mono y Flux se convierten en sus mejores amigos (aunque al principio existen es solo odio :))





Entre las tareas comerciales, muy a menudo hay procedimientos serios que procesan grandes cantidades de datos, y también tenemos que usar un reactor para ellos. Aquí comienzan las sorpresas, si no sabes cocinar el reactor, puedes tener muchos problemas. Superando el límite de descriptores de archivos en el servidor, OutOfMemory debido a la velocidad incontrolada del código sin bloqueo, y mucho más, de lo que hablaremos hoy. Mis colegas y yo hemos experimentado muchas dificultades debido a problemas para entender cómo mantener el reactor bajo control, ¡pero todo lo que no nos mata nos hace más inteligentes!





Código de bloqueo y no bloqueo

No comprenderá nada más si no comprende la diferencia entre código de bloqueo y no bloqueo . Por lo tanto, detengámonos y comprendamos cuidadosamente cuál es la diferencia. Ya lo sabes, el código de bloqueo del reactor es el enemigo, el código de no bloqueo es hermano. El único problema es que, por el momento, no todas las interacciones tienen contrapartes que no bloqueen.





El líder aquí es la interacción HTTP, hay muchas opciones, elija cualquiera. Prefiero Reactive Feign de Playtika, en combinación con Spring Boot + WebFlux + Eureka obtenemos una muy buena construcción para la arquitectura de microservicios.





-: , , reactive, - :) Hibernate + PostgreSQL - , JavaMail - , IBMMQ - . , , MongoDB - . , , , (Thread.sleep() / Socket.read() ), - . ? , . 2 :





  • . BlockHound ( )





  • , , : Schedulers.boundedElastic()



    . publishOn



    & subscribeOn







, , !





1





    @Test
    fun testLevel1() {
        val result = Mono.just("")
            .map { "123" }
            .block()

        assertEquals("123", result)
    }
      
      



, reactor . ? Mono.just



:) map



"123" block



subscribe



.





block



, , , . block



RestController



, .





2





    fun nonBlockingMethod1sec(data: String) 
    = data.toMono().delayElement(Duration.ofMillis(1000))

    @Test
    fun testLevel2() {
        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { nonBlockingMethod1sec(it) }
            .block()

        assertEquals("Hello world", result)
    }
      
      



, nonBlockingMethod1sec



, - . - , , .





3





    fun collectTasks() = (0..99)

    @Test
    fun testLevel3() {
        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { businessContext ->
                collectTasks()
                    .toFlux()
                    .map {
                        businessContext + it
                    }
                    .collectList()
            }
            .block()!!

        assertEquals(collectTasks().toList().size, result.size)
    }
      
      



- Flux



! collectTasks



, , Flux



- . map. collectList



.





, . " ", .





4





    fun collectTasks() = (0..100)
    
    @Test
    fun testLevel4() {
        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { businessContext ->
                collectTasks().toFlux()
                    .flatMap {
                        Mono.deferContextual { reactiveContext ->
                            val hash = businessContext + it + reactiveContext["requestId"]
                            hash.toMono()
                        }
                    }.collectList()
            }
            .contextWrite { it.put("requestId", UUID.randomUUID().toString()) }
            .block()!!

        assertEquals(collectTasks().toList().size, result.size)
    }
      
      



. (15)



, (10)



. .





5





    fun collectTasks() = (0..1000)
    
    fun doSomethingNonBlocking(data: String)
        = data.toMono().delayElement(Duration.ofMillis(1000))
    
    fun doSomethingBlocking(data: String): String {
        Thread.sleep(1000); return data
    }

    val pool = Schedulers.newBoundedElastic(10, Int.MAX_VALUE, "test-pool")
    private val logger = getLogger()

    @Test
    fun testLevel5() {
        val counter = AtomicInteger(0)
        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { _ ->
                collectTasks().toFlux()
                    .parallel()
                    .runOn(pool)
                    .flatMap {
                        Mono.deferContextual { _ ->
                            doSomethingNonBlocking(it.toString())
                                .doOnRequest { logger.info("Added task in pool ${counter.incrementAndGet()}") }
                                .doOnNext { logger.info("Non blocking code finished ${counter.get()}") }
                                .map { doSomethingBlocking(it) }
                                .doOnNext { logger.info("Removed task from pool ${counter.decrementAndGet()}") }
                        }
                    }.sequential()
                    .collectList()
            }
            .block()!!

        assertEquals(collectTasks().toList().size, result.size)
    }
      
      



! , . : doSomethingNonBlocking



(3)



& doSomethingBlocking



(6)



- , . (10)



, (15)



. parallel



(19)



sequential



(29)



. (20)



. , , doOnRequest



( ), doOnNext



( ). - , .





"", , . - , , . , , . , Flux .





. . , ? 100 , 1 , 1 , 10 ? ( senior reactor developer :))





12 . :) , 100 10 , 10 . , . " " .





(26) .map { doSomethingBlocking(it) }



. , , ?





2 ! 1 " " 1 . 100 . 10 ? ? .





collectTasks()



... 1000? 15000? ?





2 ! 1 " " 1 . . . ?





?

? ? ? 30000 , , , , ( web-client feign, ?) , , SSH . , , " ".





. Thread Pool & Reactor

- , - X , X , - . ? :) .





thread pool - . - , .





reactor! ?





, , . ? epoll , . . , , . , " ?", , . . , - , 500 -, . ! , , Schedulers.boundedElastic()



.





"", ?





!

, , , , , , 4-8 production 32 .





parallel



parallelism







parallelism



, rails ( , , ). Prefetch .





parallelism , .





flatMap



( Flux) , maxConcurrency







maxConcurrency



, Integer.MAX_VALUE



( . ?





, , ( http ), ! .





.





:





  • parallel (parallelism)





  • flatMap (maxConcurrency)









, .





- * Integer.MAX_VALUE *







, 5 5 . !





        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { _ ->
                collectTasks().toFlux()
                    .parallel(1)
                    .runOn(pool, 1)
                    .flatMap({
                        Mono.deferContextual { _ ->
                            doSomethingNonBlocking(it.toString())
                        }
                    }, false, 1, 1)
                    .sequential()
                    .collectList()
            }
            .block()!!
      
      



, ?





Thread Pool

? . - , , ! ? , :)





, Schedulers.parallel() ? =) ( parallel, ) , , , .





. , , , . , , production . .





, round-robin, .





Reactor bien cargado (las tareas se distribuyen uniformemente).  54 tareas de bloqueo (1 segundo cada una), distribución por turnos en 6 rieles
( ). 54 ( 1),
 round-robin 6

production , , , .





Grupo mal cargado (las tareas no se distribuyen uniformemente) 54 tareas de bloqueo (cada una durante 1 segundo excepto 2), distribución por turnos en 6 rieles
( ) 54 ( 1 2),
 round-robin 6

collectList()



, , 1 . , , .









  • concatMap



    flatMap



    ( , )





  • , ( )





  • , ( )





  • prefetch



    ( !)





prefetch



flatMap



& runOn



, , , . - 256. 1, "work stealing", , , , .





Grupo bien cargado (las tareas se distribuyen uniformemente) 54 tareas de bloqueo (cada una durante 1 segundo excepto 2 segundos), distribución por turnos en 6 rieles de captación previa.
( ) 54 ( 1 2),
 round-robin 6 Prefetch !

Eso es todo para mi. Será interesante leer tus observaciones y comentarios, no pretendo ser 100% cierto, pero todos los resultados están respaldados por ejemplos prácticos, en Spring Boot + Project Reactor 3.4. ¡Gracias a todos!








All Articles