Aprendiendo Scala: Parte 4 - WebSocket



¡Hola, Habr! Esta vez intenté hacer un chat simple a través de Websockets. Para más detalles, bienvenido bajo cat.



Contenido





Enlaces



  1. Códigos fuente
  2. Imágenes de docker
  3. Tapir
  4. Http4s
  5. Fs2
  6. Doobie
  7. ScalaTest
  8. ScalaCheck
  9. ScalaTestPlusScalaCheck


En realidad, todo el código está en un objeto ChatHub



class ChatHub[F[_]] private(
                             val topic: Topic[F, WebSocketFrame],
                             private val ref: Ref[F, Int]
                           )
                           (
                             implicit concurrent: Concurrent[F],
                             timer: Timer[F]
                           ) extends Http4sDsl[F] {

  val endpointWs: ServerEndpoint[String, Unit, String, Stream[IO, WebSocketFrame], IO] = endpoint
    .get
    .in("chat")
    .tag("WebSockets")
    .summary("   .    : ws://localhost:8080/chat")
    .description("   ")
    .in(
      stringBody
        .description("      ")
        .example("!")
    )
    .out(
      stringBody
        .description("  -   ")
        .example("6 :     Id  f518a53d: !")
    )
    //    . 
    .serverLogic(_ => IO(Left(()): Either[Unit, String]))

  def routeWs: HttpRoutes[F] = {
    HttpRoutes.of[F] {
      case GET -> Root / "chat" => logic()
    }
  }

  private def logic(): F[Response[F]] = {
    val toClient: Stream[F, WebSocketFrame] =
      topic.subscribe(1000)
    val fromClient: Pipe[F, WebSocketFrame, Unit] =
      handle
    WebSocketBuilder[F].build(toClient, fromClient)
  }

  private def handle(s: Stream[F, WebSocketFrame]): Stream[F, Unit] = s
    .collect({
      case WebSocketFrame.Text(text, _) => text
    })
    .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
    .through(topic.publish)
}

object ChatHub {

  def apply[F[_]]()(implicit concurrent: Concurrent[F], timer: Timer[F]): F[ChatHub[F]] = for {
    ref <- Ref.of[F, Int](0)
    topic <- Topic[F, WebSocketFrame](WebSocketFrame.Text("==="))
  } yield new ChatHub(topic, ref)
}


Aquí debe decir inmediatamente acerca de Topic, una primitiva de sincronización de Fs2 que le permite hacer un modelo de Publicador - Suscriptor, y puede tener muchos Publicadores y muchos Suscriptores al mismo tiempo. En general, es mejor enviarle mensajes a través de algún tipo de búfer como Queue porque tiene un límite en la cantidad de mensajes en la cola y Publisher espera hasta que todos los Suscriptores reciban mensajes en su cola de mensajes y si se desborda puede colgarse.



val topic: Topic[F, WebSocketFrame],


Aquí también cuento el número de mensajes que se enviaron al chat como el número de cada mensaje. Como necesito hacer esto desde diferentes hilos, utilicé un análogo de Atomic, que se llama Ref aquí y garantiza la atomicidad de la operación.



  private val ref: Ref[F, Int]


Procesando un flujo de mensajes de los usuarios.



  private def handle(stream: Stream[F, WebSocketFrame]): Stream[F, Unit] = 
    stream
//       . 
    .collect({
      case WebSocketFrame.Text(text, _) => text
    })
//               .
    .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
//     
    .through(topic.publish)


En realidad, la lógica misma de crear un socket.



private def logic(): F[Response[F]] = {
//    .
    val toClient: Stream[F, WebSocketFrame] =
//        
      topic.subscribe(1000)
//        
    val fromClient: Pipe[F, WebSocketFrame, Unit] =
//      
      handle
//         .
    WebSocketBuilder[F].build(toClient, fromClient)
  }


Vinculamos nuestro socket a la ruta en el servidor (ws: // localhost: 8080 / chat)



def routeWs: HttpRoutes[F] = {
    HttpRoutes.of[F] {
      case GET -> Root / "chat" => logic()
    }
  }


De hecho, eso es todo. Entonces puede iniciar el servidor con esta ruta. Todavía quería hacer cualquier tipo de documentación. En general, para documentar WebSocket y otras interacciones basadas en eventos como RabbitMQ AMPQ, existe AsynAPI, pero no hay nada debajo de Tapir, así que acabo de hacer una descripción del punto final para Swagger como una solicitud GET. Por supuesto, no funcionará. Más precisamente, se devolverá un error 501, pero se mostrará en Swagger



  val endpointWs: Endpoint[String, Unit, String, fs2.Stream[F, Byte]] = endpoint
    .get
    .in("chat")
    .tag("WebSockets")
    .summary("   .    : ws://localhost:8080/chat")
    .description("   ")
    .in(
      stringBody
        .description("      ")
        .example("!")
    )
    .out(
      stringBody
        .description("  -   ")
        .example("6 :     Id  f518a53d: !")
    )


En el propio swagger, se ve así: conecta







nuestro chat a nuestro servidor API



    todosController = new TodosController()
    imagesController = new ImagesController()
//   
    chatHub <- Resource.liftF(ChatHub[IO]())
    endpoints = todosController.endpoints ::: imagesController.endpoints
//     Swagger
    docs = (chatHub.endpointWs :: endpoints).toOpenAPI("The Scala Todo List", "0.0.1")
    yml: String = docs.toYaml
//      
    routes = chatHub.routeWs <+>
      endpoints.toRoutes <+>
      new SwaggerHttp4s(yml, "swagger").routes[IO]
    httpApp = Router(
      "/" -> routes
    ).orNotFound
    blazeServer <- BlazeServerBuilder[IO](serverEc)
      .bindHttp(settings.host.port, settings.host.host)
      .withHttpApp(httpApp)
      .resource


Nos conectamos al chat con un script sumamente sencillo.



    <script>
        const id = `f${(~~(Math.random() * 1e8)).toString(16)}`;
        const webSocket = new WebSocket('ws://localhost:8080/chat');

        webSocket.onopen = event => {
            alert('onopen ');
        };

        webSocket.onmessage = event => {
            console.log(event);
            receive(event.data);
        };

        webSocket.onclose = event => {
            alert('onclose ');
        };

        function send() {
            let text = document.getElementById("message");
            webSocket.send(`    Id  ${id}: ${text.value}`);
            text.value = '';
        }

        function receive(m) {
            let text = document.getElementById("chat");
            text.value = text.value + '\n\r' + m;
        }
    </script>


En realidad esto es todo. Espero que alguien que también estudie la roca encuentre este artículo interesante y quizás incluso útil.



All Articles