Un ejemplo de una aplicación basada en webhook basada en eventos en el almacenamiento de objetos S3 Mail.ru Cloud Solutions



Cafetera Rube Goldberg



La arquitectura basada en eventos aumenta la rentabilidad de los recursos utilizados porque se utilizan solo cuando son necesarios. Hay muchas opciones sobre cómo implementar esto y no crear entidades de nube adicionales como aplicaciones de trabajo. Y hoy no hablaré sobre FaaS, sino sobre webhooks. Le mostraré un ejemplo tutorial de manejo de eventos con webhooks de Object Storage.



Algunas palabras sobre almacenamiento de objetos y webhooks. Los almacenamientos de objetos le permiten almacenar cualquier información en la nube como objetos accesibles a través de S3 u otra API (dependiendo de la implementación) a través de HTTP / HTTPS. Los webhooks son generalmente devoluciones de llamada HTTP personalizadas. Por lo general, se desencadenan por un evento, como el envío de un código a un repositorio o un comentario publicado en un blog. Cuando se produce un evento, el sitio de origen envía una solicitud HTTP a la URL especificada para el webhook. Como resultado, puede hacer que los eventos en un sitio activen acciones en otro ( wiki ). Cuando el sitio de origen es Object Storage, los eventos son cambios en su contenido.



Ejemplos de casos simples en los que se puede utilizar dicha automatización:



  1. . « », .
  2. , , .
  3. ( , , , ).
  4. , , Kubernetes, , .


Como ejemplo, haremos una variante de la tarea 1, cuando los cambios en el depósito de almacenamiento de objetos Mail.ru Cloud Solutions (MCS) se sincronizan utilizando webhooks en el almacenamiento de objetos de AWS. En un caso real cargado, debe proporcionar trabajo asincrónico registrando webhooks en la cola, pero para la tarea educativa haremos la implementación sin esto.



Esquema de trabajo



El protocolo de comunicación se describe en detalle en la guía de webhooks S3 en MCS . El esquema de trabajo tiene los siguientes elementos:



  • Un servicio de publicación que se ubica en el lado S3 y publica solicitudes HTTP cuando se dispara un webnhook.
  • Un servidor receptor de webhook que escucha las solicitudes del servicio de publicación HTTP y toma las medidas adecuadas. El servidor se puede escribir en cualquier idioma, en nuestro ejemplo escribiremos el servidor en Go.


La peculiaridad de la implementación de webhook en la API S3 es el registro del servidor de recepción de webhook en el servicio de publicación. En particular, el servidor receptor de webhook debe confirmar la suscripción a los mensajes del servicio de publicación (en otras implementaciones de webhook, generalmente no es necesario confirmar la suscripción).



En consecuencia, el servidor receptor de webhook debe admitir dos operaciones principales:



  • responder a una solicitud del servicio de publicación para confirmar el registro,
  • procesar eventos entrantes.


Instalar el servidor para recibir webhooks



Se requiere un servidor Linux para ejecutar el servidor receptor de webhook. En este artículo, como ejemplo, usamos una instancia virtual que implementamos en MCS.



Instale el software requerido e inicie el servidor webhook.



ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...


Clone la carpeta con el servidor receptor de webhook:



ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.


Comencemos el servidor:



ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80


Suscribirse al servicio de publicación



Puede registrar su servidor para recibir webhooks a través de API o interfaz web. Por simplicidad, nos registraremos a través de la interfaz web:



  1. Vaya a la sección de cubos en la sala de control.
  2. Vamos al cubo, para el cual configuraremos webhooks, y hacemos clic en el engranaje:






Vaya a la pestaña Webhooks y haga clic en Agregar:





Complete los campos:







ID: el nombre del webhook.



Evento: qué eventos enviar. Hemos establecido la transferencia de todos los eventos que ocurren cuando se trabaja con archivos (agregar y eliminar).



URL: dirección del servidor receptor del webhook.



El prefijo / sufijo de filtro es un filtro que permite generar webhooks solo para objetos cuyos nombres coinciden con ciertas reglas. Por ejemplo, para que el webhook funcione solo con archivos con la extensión .png, escriba "png" en el sufijo Filter .



Actualmente, solo se admiten los puertos 80 y 443 para acceder al servidor de recepción del webhook.



Haga clic en Agregar enlace y vea lo siguiente: Se





agregó el enlace.



El servidor para recibir webhooks en los registros muestra el progreso del proceso de registro de ganchos:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d


El registro ha terminado. En la siguiente sección, veremos más de cerca el algoritmo para el servidor que recibe webhooks.



Descripción del servidor para recibir webhooks



En nuestro ejemplo, el servidor está escrito en Go. Analicemos los principios básicos de su trabajo.



package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %s\n", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pong\n")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}


Consideremos las funciones principales:



  • Ping () es una ruta que responde por URL / ping, la implementación más simple de una sonda de vida.
  • Webhook (): ruta principal, controlador de URL / webhook:

    • confirma el registro en el servicio de publicación (transición a la función SubscriptionConfirmation),
    • procesa webhooks entrantes (función Gotrecords).
  • Las funciones HmacSha256 y HmacSha256hex son implementaciones de los algoritmos de cifrado HMAC-SHA256 y HMAC-SHA256 con la salida como una cadena de números hexadecimales para la resta de firmas.
  • main es la función principal, procesa los parámetros de la línea de comandos y registra los controladores de URL.


Parámetros de línea de comando aceptados por el servidor:



  • -port es el puerto en el que escuchará el servidor.
  • -address es la dirección IP que escuchará el servidor.
  • -script es un programa externo que se llama en cada enlace que entra.


Echemos un vistazo más de cerca a algunas de las funciones:



//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %s\n", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
"\"Type\":\"SubscriptionConfirmation\"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}


Esta función determina lo que ha llegado: una solicitud de confirmación de registro o un webhook. Como se desprende de la documentación , en caso de confirmación de registro, la siguiente estructura Json viene en la solicitud de publicación:



POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}


Esta solicitud debe ser respondida:



content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}


Donde la firma se calcula como:



signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))


Si llega un webhook, la estructura de la solicitud de publicación se ve así:



POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}


En consecuencia, dependiendo de la solicitud, debe comprender cómo procesar los datos. Elegí un registro como indicador "Type":"SubscriptionConfirmation", ya que está presente en la solicitud para confirmar la suscripción y no está presente en el webhook. En función de la presencia / ausencia de este registro en la solicitud POST, la ejecución adicional del programa va a una función SubscriptionConfirmationo a una función GotRecords.



No consideraremos la función SubscriptionConfirmation en detalle, se implementa de acuerdo con los principios establecidos en la documentación . Puede verificar el código fuente de esta función en el repositorio git del proyecto .



La función GotRecords analiza la solicitud entrante y, para cada objeto Record, llama a un script externo (cuyo nombre se pasó en el parámetro -script) con los parámetros:



  • nombre del cubo
  • clave de objeto
  • Actuar:

    • copia - si en la solicitud original EventName = ObjectCreated | PutObject | PutObjectCopy
    • eliminar - si en la solicitud original EventName = ObjectRemoved | DeleteObject


Por lo tanto, si llega un enlace con una solicitud Post, como se describió anteriormente , y el parámetro -script = script.sh, el script se llamará de la siguiente manera:



script.sh  bucketA some-file-to-bucket copy


Debe entenderse que este servidor receptor de webhook no es una solución de producción completa, sino un ejemplo simplificado de una posible implementación.



Ejemplo de trabajo



Sincronicemos los archivos del depósito principal en MCS con el depósito de respaldo en AWS. El depósito principal se llama myfiles-ash, la copia de seguridad es myfiles-backup (la configuración de un depósito en AWS está fuera del alcance de este artículo). En consecuencia, cuando un archivo se coloca en el depósito principal, su copia debe aparecer en la copia de seguridad, cuando se elimina del archivo principal, se debe eliminar en la copia de seguridad.



Trabajaremos con cubos utilizando la utilidad awscli, con la que tanto el almacenamiento en la nube MCS como el almacenamiento en la nube AWS son compatibles.



ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...


Configuremos el acceso a la API S3 MCS:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:


Configuremos el acceso a la API de AWS S3:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:


Verifiquemos los accesos:



a AWS:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup


Para MCS, cuando se ejecuta el comando, agregue --endpoint-url:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash


Accedido



Ahora escribamos un script para manejar el enlace entrante, llamémoslo s3_backup_mcs_aws.sh



#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: ${0} sourcebucket sourcefile copy/delete"
    exit 1
    ;;
esac


Iniciamos el servidor:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh


Mirando cómo funciona. A través de la interfaz web MCS, agregue el archivo test.txt en el cubo myfiles-ash. En los registros de la consola, puede ver que se realizó una solicitud al servidor webhook:



2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt


Verifiquemos el contenido del depósito de copia de seguridad de mis archivos en AWS:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt


Ahora, a través de la interfaz web, elimine el archivo del cubo myfiles-ash.



Registros del servidor:



2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt


Contenido del cubo:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$


Archivo eliminado, problema resuelto.



Conclusión y Tareas



Todo el código utilizado en este artículo está en mi repositorio . También hay ejemplos de scripts y ejemplos de firmas de conteo para registrar webhooks.



Este código no es más que un ejemplo de cómo puede usar webhooks S3 en sus actividades. Como dije al principio, si planea usar un servidor de este tipo en producción, al menos debe reescribir el servidor para el trabajo asincrónico: registrar los webhooks entrantes en una cola (RabbitMQ o NATS), y desde allí desarmarlos y procesarlos mediante aplicaciones de trabajo. De lo contrario, con la llegada masiva de webhooks, puede encontrar una falta de recursos del servidor para realizar tareas. La presencia de colas le permite distribuir el servidor y los trabajadores, así como resolver problemas con la repetición de tareas en caso de fallas. También es deseable cambiar el registro a uno más detallado y más estandarizado.



¡Buena suerte!



Lea más sobre el tema:






All Articles