Integración de Spring: flujos de datos dinámicos

Fuegos artificiales, Habr! Hoy analizaremos un área bastante específica: la transmisión de datos utilizando el marco de Spring Integration y cómo realizar estos flujos en tiempo de ejecución sin inicialización preliminar en el contexto de la aplicación. Una aplicación de muestra completa está en el Gita .



Introducción



Spring Integration es un Enterprise Integration Framework (EIP) que utiliza mecanismos de mensajería bajo el capó entre adaptadores de diferentes protocolos / sistemas de integración basados ​​en canales de mensajes (colas condicionales). Los análogos famosos son Camel, Mule, Nifi.



A partir del caso de prueba, tendremos que hacer un servicio REST que pueda leer los parámetros de consulta recibidos, ir a nuestra base de datos, por ejemplo, postgres, actualizar y recuperar los datos de la tabla utilizando los parámetros recibidos de la fuente, y enviar el resultado a la cola (solicitud / respuesta), y también hacer múltiples instancias con diferentes rutas de solicitud.



Convencionalmente, el diagrama de flujo de datos se verá así:



imagen



A continuación, le mostraré cómo puede hacer esto simplemente sin bailar mucho con una pandereta, utilizando IntegrationFlowContext, con componentes de control / REST que controlan los puntos finales. Todo el código principal del proyecto estará ubicado en el repositorio, aquí solo indicaré algunos recortes. Bueno, quién está interesado, por favor, debajo del gato.



Herramientas



Comencemos con el bloque de dependencia por defecto. Básicamente, necesitaremos proyectos de arranque de resorte, para la ideología REST de flujo / gestión de componentes, integración de resorte, para crear nuestro caso basado en canales y adaptadores.



E inmediatamente pensamos qué más necesitamos para reproducir el caso. Además de las dependencias principales, necesitamos subproyectos: integración-http, integración-jdbc, integración-groovy (proporciona convertidores de datos dinámicamente personalizables basados ​​en scripts Goovy). Por separado, diré que en este ejemplo no usaremos el convertidor maravilloso como innecesario, sino que brindaremos la posibilidad de personalizarlo desde el exterior.



Lista de dependencia
 <!-- Spring block -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-groovy</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
        </dependency>

        <!-- Db block -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>

        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
        </dependency>

        <!-- Utility block -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>org.reflections</groupId>
            <artifactId>reflections</artifactId>
            <version>0.9.12</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>




Cocina interior



Pasemos a crear los componentes necesarios del sistema (envoltorios / modelos). Necesitaremos canales, bean, httpInboundGateway, handler, jdbcOutboundGateway y modelos de resultados.



  • bean: un objeto auxiliar necesario para adaptadores, hilo
  • channel - canal para entregar mensajes a / desde componentes de transmisión
  • httpInboundGateway: punto de acceso http al cual enviaremos una solicitud con datos para su posterior procesamiento
  • controlador: un tipo genérico de controlador (transformadores de ranura, varios adaptadores, etc.)
  • jdbcOutboundGateway - adaptador jdbc
  • resultado: controlador para enviar información a un canal específico


Necesitaremos envoltorios para almacenar parámetros e inicializar correctamente los componentes de una secuencia completa, por lo que inmediatamente creamos un almacén de componentes, agregue. funcionalidad de convertidores JSON -> Modelo de definición. El mapeo directo de campos usando jackson y objetos en mi caso no era aplicable: tenemos una bicicleta más para un protocolo de comunicación específico.



Hagámoslo bien de inmediato , usando anotaciones :



StreamComponent: es responsable de identificar las clases como un modelo de ajuste de un componente de flujo y tiene información de servicio: el nombre del componente, el tipo del componente, si el componente está anidado y la descripción;



SettingClass: responsable de opciones adicionales para escanear el modelo, como escanear campos de superclase e ignorar campos al inicializar valores;



SettingValue: responsable de identificar el campo de clase como personalizable desde el exterior, con configuraciones de nombres en JSON, descripción, convertidor de tipo, indicador de campo requerido e indicador de objeto interno con fines informativos;



Administrador de almacenamiento de componentes



Métodos auxiliares para trabajar con modelos para controladores REST



Modelo base: una abstracción con un conjunto de campos auxiliares / métodos modelo



Modelos de configuración de flujo actual



Mapper JSON -> Modelo de definición



Se preparó el terreno principal para el trabajo. Ahora pasemos a la implementación, directamente, de los servicios que serán responsables del ciclo de vida, el almacenamiento y la inicialización de las transmisiones e inmediatamente estableceremos la idea de que podemos paralelizar 1 transmisión con el mismo nombre en varias instancias, es decir. tendremos que hacer identificadores únicos (guías) para todos los componentes del flujo; de lo contrario, pueden producirse colisiones con otros componentes únicos (beans, canales, etc.) en el contexto de la aplicación. Pero primero hagamos mapeadores de dos componentes: http y jdbc, es decir El incremento de los modelos realizados anteriormente a los componentes de la secuencia en sí (HttpRequestHandlerEndpointSpec y JdbcOutboundGateway).



HttpRegistry



JdbcRegistry



Servicio de administración central ( StreamDeployingService) realiza las funciones de almacenamiento de trabajadores / inactivos, registra nuevos, inicia, detiene y elimina los subprocesos completamente del contexto de la aplicación. Una característica importante del servicio es la implementación de la dependencia IntegrationFlowBuilderRegistry, que nos ayuda a hacer la dinámica de la aplicación (quizás recuerde estos archivos xml de configuración o clases DSL por kilómetros). De acuerdo con la especificación de flujo, siempre debe comenzar con un componente o canal entrante, por lo que tenemos esto en cuenta en la implementación del método registerStreamContext.



Y el administrador auxiliar ( IntegrationFlowBuilderRegistry), que realiza la función de un mapeador de modelos para componentes de flujo y la inicialización del flujo en sí mismo usando IntegrationFlowBuilder. También implementé un controlador de registro en la tubería de flujo, un servicio para recopilar métricas de canal de flujo (una opción conmutable) y una posible implementación de convertidores de mensajes de flujo basados ​​en la implementación Groovy (si de repente este ejemplo se convierte en la base para la venta, entonces la precompilación de scripts maravillosos debe hacerse en la etapa de inicialización del flujo , porque te encuentras con pruebas de carga en RAM y no importa cuántos núcleos y potencia tengas). Dependiendo de la configuración de las etapas de registro y los parámetros de nivel de registro del modelo, estará activo después de cada transmisión de un mensaje de componente a componente. La supervisión está habilitada y deshabilitada por un parámetro en application.yml:



monitoring:
  injectction:
    default: true


Ahora que tenemos toda la mecánica para inicializar flujos de procesamiento de datos dinámicos, también podemos escribir mapeadores para varios protocolos y adaptadores como RabbitMQ, Kafka, Tcp, Ftp, etc. Además, en la mayoría de los casos, no necesita escribir nada con su propia mano (excepto, por supuesto, los modelos de configuración y los métodos auxiliares): un número bastante grande de componentes ya están presentes en el repositorio .



La etapa final será la implementación de controladores para obtener información sobre los componentes del sistema existentes, administrar flujos y obtener métricas.



ComponentsController : proporciona información sobre todos los componentes en un modelo legible para humanos, y un componente por nombre y tipo.



StreamController - proporciona una gestión completa del flujo, es decir, la inicialización de nuevos modelos JSON, iniciando, deteniendo, eliminando y emitiendo métricas por identificador.



Producto final



Levantamos la aplicación resultante y describimos el caso de prueba en formato JSON.



Flujo de datos de muestra
:



CREATE TABLE IF NOT EXISTS account_data
(
    id          INT                      NOT NULL,
    accountname VARCHAR(45)              NOT NULL,
    password    VARCHAR(128),
    email       VARCHAR(255),
    last_ip     VARCHAR(15) DEFAULT NULL NOT NULL
);

CREATE UNIQUE INDEX account_data_username_uindex
    ON account_data (accountname);

ALTER TABLE account_data
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_data_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_data
    ADD CONSTRAINT account_data_pk
        PRIMARY KEY (id);

CREATE TABLE IF NOT EXISTS account_info
(
    id             INT NOT NULL,
    banned         BOOLEAN  DEFAULT FALSE,
    premium_points INT      DEFAULT 0,
    premium_type   SMALLINT DEFAULT -1
);

ALTER TABLE account_info
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_info_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_info
    ADD CONSTRAINT account_info_account_data_id_fk FOREIGN KEY (id) REFERENCES account_data (id)
        ON UPDATE CASCADE ON DELETE CASCADE;

ALTER TABLE account_info
    ADD CONSTRAINT account_info_pk
        PRIMARY KEY (id);



INSERT INTO account_data (accountname, password, email, last_ip)
VALUES ('test', 'test', 'test@test', '127.0.0.1');
INSERT INTO account_info (banned, premium_points, premium_type)
VALUES (false, 1000, 1);


: order — , .. , . ( ). — .



{
  "flowName": "Rest Postgres stream",
  "components": [
    {
      "componentName": "bean",
      "componentType": "other",
      "componentParameters": {
        "id": "pgDataSource",
        "bean-type": "com.zaxxer.hikari.HikariDataSource",
        "property-args": [
          {
            "property-name": "username",
            "property-value": "postgres"
          },
          {
            "property-name": "password",
            "property-value": "postgres"
          },
          {
            "property-name": "jdbcUrl",
            "property-value": "jdbc:postgresql://localhost:5432/test"
          },
          {
            "property-name": "driverClassName",
            "property-value": "org.postgresql.Driver"
          }
        ]
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcReqChannel",
        "order": 1,
        "channel-type": "direct",
        "max-subscribers": 1000
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcRepChannel",
        "order": 1,
        "channel-type": "direct"
      }
    },
    {
      "componentName": "http-inbound-gateway",
      "componentType": "source",
      "componentParameters": {
        "order": 2,
        "http-inbound-supported-methods": [
          "POST"
        ],
        "payload-type": "org.genfork.integration.model.request.http.SimpleJdbcPayload",
        "log-stages": true,
        "log-level": "INFO",
        "request-channel": "jdbcReqChannel",
        "reply-channel": "jdbcRepChannel"
      }
    },
    {
      "componentName": "handler",
      "componentType": "processor",
      "componentParameters": {
        "order": 3,
        "handler-definition": {
          "componentName": "jdbc-outbound-adapter",
          "componentType": "app",
          "componentParameters": {
            "data-source": "pgDataSource",
            "query": "SELECT accountname, password, email, last_ip, banned, premium_points, premium_type FROM account_data d INNER JOIN account_info i ON d.id = i.id WHERE d.id = :payload.accountId",
            "update-query": "UPDATE account_info SET banned = true WHERE id = :payload.accountId",
            "jdbc-reply-channel": "jdbcRepChannel",
            "log-stages": true,
            "log-level": "INFO"
          }
        }
      }
    },
    {
      "componentName": "result",
      "componentType": "app",
      "componentParameters": {
        "order": 4,
        "cancel": false,
        "result-channel": "jdbcRepChannel"
      }
    }
  ]
}





Prueba:



1) Inicializamos una nueva secuencia utilizando el método



POST / stream / deploy, donde nuestro JSON estará en el cuerpo de la solicitud.



En respuesta, el sistema tendrá que enviar si todo es correcto; de lo contrario, aparecerá un mensaje de error:



{
    "status": "SUCCESS", -  
    "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b" -  
}


2) Iniciamos el inicio utilizando el método:



GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / start, donde indicamos el ID de la secuencia inicializada anteriormente.



En respuesta, el sistema tendrá que enviar si todo es correcto; de lo contrario, aparecerá un mensaje de error:



{
    "status": "SUCCESS", -  
}


3) ¿Llamando a una secuencia por un identificador en el sistema? Cómo, qué y dónde: en el mapeador del modelo HttpRegistry, escribí la condición



Http.inboundGateway(localPath != null ? localPath : String.format("/stream/%s/call", uuid))


donde, el parámetro http-inbound-path se tiene en cuenta, y si no se especifica explícitamente en la configuración del componente, se ignora y se establece la ruta de llamada del sistema. En nuestro caso, esto será:



POST / stream / ece4d4ac-3b46-4952-b0a6-8cf334074b99 / call - donde está presente el identificador de flujo, con el cuerpo de la solicitud:



{
    "accountId": 1
}


En respuesta, recibiremos, si las etapas de procesamiento de la solicitud funcionaron correctamente, recibiremos una estructura plana de registros de las tablas account_data y account_info.



{
    "accountname": "test",
    "password": "test",
    "email": "test@test",
    "last_ip": "127.0.0.1",
    "banned": true,
    "premium_points": 1000,
    "premium_type": 1
}


La especificidad del adaptador JdbcOutboundGateway es tal que si especifica el parámetro update-query, se registra un controlador adicional, que primero actualiza los datos, y solo luego obtiene el parámetro de consulta.



Si especifica las mismas rutas manualmente, se eliminará la posibilidad de iniciar componentes con HttpInboundGateway como punto de acceso a una secuencia en varias instancias porque el sistema no permitirá registrar una ruta similar.



4) Veamos las métricas utilizando el método GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / metrics



Contenido de respuesta
, / , / / :



[
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcReqChannel",
        "sendDuration": {
            "count": 1,
            "min": 153.414,
            "max": 153.414,
            "mean": 153.414,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 153.414,
        "minSendDuration": 153.414,
        "meanSendDuration": 153.414,
        "meanSendRate": 0.001195117818082359,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.2bf65d9d-97c6-4199-86aa-0c808c25071b.channel#2",
        "sendDuration": {
            "count": 1,
            "min": 0.1431,
            "max": 0.1431,
            "mean": 0.1431,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.1431,
        "minSendDuration": 0.1431,
        "meanSendDuration": 0.1431,
        "meanSendRate": 0.005382436008121413,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 0.0
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcRepChannel",
        "sendDuration": {
            "count": 1,
            "min": 0.0668,
            "max": 0.0668,
            "mean": 0.0668,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.0668,
        "minSendDuration": 0.0668,
        "meanSendDuration": 0.0668,
        "meanSendRate": 0.001195118373693797,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    }
]




Conclusión



Por lo tanto, se mostró cómo, después de dedicar un poco más de tiempo y esfuerzo, escribir una aplicación para la integración con varios sistemas que escribir manejadores manuales adicionales (tuberías) cada vez en su aplicación para la integración con otros sistemas, 200-500 líneas de código cada uno.



En el ejemplo actual, puede paralelizar el trabajo del mismo tipo de hilos para varias instancias por medio de identificadores únicos, evitando colisiones en el contexto global de la aplicación entre dependencias de hilos (contenedores, canales, etc.).



Además, puede desarrollar el proyecto:



  • guardar secuencias en la base de datos;
  • respaldar todos los componentes de integración que nos brinda la comunidad de primavera e integración de primavera;
  • hacer que los trabajadores que realicen el trabajo con hilos en un horario;
  • crear una interfaz de usuario sensata para configurar transmisiones con un "mouse condicional y cubos de componentes" (por cierto, el ejemplo se agudizó parcialmente para el proyecto github.com/spring-cloud/spring-cloud-dataflow-ui ).


Y una vez más duplicaré el enlace al repositorio .



All Articles