Escribir un ORM simple con la capacidad de cambiar bases de datos sobre la marcha

imagen



¡Hola, Habr! El karma se agotó debido a un comentario descuidado en el artículo de holivar, lo que significa que debes escribir una publicación interesante (espero) y rehabilitarte.



He estado usando un cliente de servidor de telegramas en php durante varios años. Y como muchos usuarios, cansado del constante crecimiento en el consumo de memoria. ¡Algunas sesiones pueden tomar de 1 a 8 gigabytes de RAM! Se ha prometido soporte de base de datos durante mucho tiempo, pero no ha habido avances en esta dirección. Tuve que resolver el problema yo mismo :) La popularidad del proyecto de código abierto impuso requisitos interesantes en la solicitud de extracción:



  1. Compatibilidad con versiones anteriores . Todas las sesiones existentes deberían seguir funcionando en la nueva versión (la sesión es una instancia serializada de la aplicación en un archivo);
  2. Libertad de elección de la base de datos . La capacidad de cambiar el tipo de almacenamiento sin perder datos y en cualquier momento, ya que los usuarios tienen diferentes configuraciones del entorno;
  3. Extensibilidad . Facilidad para agregar nuevos tipos de bases de datos;
  4. Guardar interfaz . El código de la aplicación que manipula datos no debe cambiar;
  5. Asincronía . El proyecto usa amphp, por lo que todas las operaciones de la base de datos deben ser sin bloqueo;


Para más detalles, invito a todos debajo del gato.



Que transferiremos



La mayor parte de la memoria de MadelineProto está ocupada por chats, usuarios y archivos. Por ejemplo, en la caché de pares, tengo más de 20 mil entradas. Estos son todos los usuarios que la cuenta ha visto (incluidos los miembros de todos los grupos), así como los canales, bots y grupos. Cuanto más antigua y activa sea la cuenta, más datos habrá en la memoria. Son decenas y cientos de megabytes, y la mayoría de ellos no se utilizan. Pero no puede borrar todo el caché, porque los telegramas restringirán severamente la cuenta de inmediato cuando intente recibir los mismos datos varias veces. Por ejemplo, después de volver a crear la sesión en mi servidor de demostración público, los telegramas en una semana respondieron a la mayoría de las solicitudes con el error FLOOD_WAIT y nada realmente funcionó. Después de que el caché se calentó, todo volvió a la normalidad.



Desde el punto de vista del código, estos datos se almacenan como matrices en las propiedades de un par de clases.



Arquitectura



Basado en los requisitos, nació un esquema:



  • Todas las matrices "pesadas" se reemplazan con objetos que implementan ArrayAccess;
  • Para cada tipo de base de datos, creamos nuestras propias clases que heredan la base;
  • Los objetos se crean y escriben en propiedades durante __consrtuct y __awake;
  • La fábrica abstracta selecciona la clase deseada para el objeto, dependiendo de la base de datos seleccionada en la configuración de la aplicación;
  • Si la aplicación ya tiene otro tipo de almacenamiento, entonces leemos todos los datos desde allí y escribimos la matriz en el nuevo almacenamiento.


Problemas del mundo asincrónico



Lo primero que hice fue crear interfaces y una clase para almacenar matrices en memoria. Este era el predeterminado, idéntico en comportamiento a la versión anterior del programa. La primera noche, estaba muy emocionado por el éxito del prototipo. El código era agradable y sencillo. Hasta ahora no se ha descubierto que sea imposible utilizar generadores dentro de métodos de la interfaz Iterator y métodos internos responsables de desarmar e isar.



Debe aclararse aquí que amphp usa la sintaxis del generador para implementar la asincronía en php. El rendimiento se vuelve análogo a async ... await from js. Si un método usa asincronía, entonces para obtener un resultado de él, debe esperar este resultado en el código usando yield. Por ejemplo:



<?php

include 'vendor/autoload.php';

$MadelineProto = new \danog\MadelineProto\API('session.madeline');
$MadelineProto->async(true);

$MadelineProto->loop(function() use($MadelineProto) {
    $myAsyncFunction = function() use($MadelineProto): \Generator {
        $me = yield $MadelineProto->start();
        yield $MadelineProto->echo(json_encode($me, JSON_PRETTY_PRINT|JSON_UNESCAPED_UNICODE));
    };

    yield $myAsyncFunction();
});


Si de cadena
yield $myAsyncFunction();
remove yield, la aplicación finalizará antes de que se ejecute este código. No obtendremos el resultado.



Agregar rendimiento antes de llamar a métodos y funciones no es muy difícil. Pero como se usa la interfaz ArrayAccess, los métodos no se llaman directamente. Por ejemplo, unset () llama a offsetUnset () e isset () llama a offsetIsset (). La situación es similar con los iteradores foreach cuando se usa la interfaz Iterator.



Agregar rendimiento frente a los métodos integrados genera un error, ya que estos métodos no están diseñados para funcionar con generadores. Un poco más en los comentarios: aquí y aquí .



Tuve que comprometerme y reescribir el código para usar mis propios métodos. Afortunadamente, había muy pocos de esos lugares. En la mayoría de los casos, las matrices se utilizaron para leer o escribir por clave. Esta funcionalidad se ha hecho muy amiga de los generadores.



La interfaz resultante es:



<?php

use Amp\Producer;
use Amp\Promise;

interface DbArray extends DbType, \ArrayAccess, \Countable
{
    public function getArrayCopy(): Promise;
    public function isset($key): Promise;
    public function offsetGet($offset): Promise;
    public function offsetSet($offset, $value);
    public function offsetUnset($offset): Promise;
    public function count(): Promise;
    public function getIterator(): Producer;

    /**
     * @deprecated
     * @internal
     * @see DbArray::isset();
     *
     * @param mixed $offset
     *
     * @return bool
     */
    public function offsetExists($offset);
}


Ejemplos de trabajo con datos



<?php
...
//
$existingChat = yield $this->chats[$user['id']];

//. 
yield $this->chats[$user['id']] = $user;
//   yield,           .
$this->chats[$user['id']] = $user;


//unset
yield $this->chats->offsetUnset($id);

//foreach
$iterator = $this->chats->getIterator();
while (yield $iterator->advance()) {
    [$key, $value] = $iterator->getCurrent();
    //  
}


Almacenamiento de datos



La forma más sencilla de almacenar datos es serializar. Tuve que abandonar el uso de json para admitir objetos. La tabla tiene dos columnas principales: clave y valor.



Un ejemplo de una consulta SQL para crear una tabla:



            CREATE TABLE IF NOT EXISTS `{$this->table}`
            (
                `key` VARCHAR(255) NOT NULL,
                `value` MEDIUMBLOB NULL,
                `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                PRIMARY KEY (`key`)
            )
            ENGINE = InnoDB
            CHARACTER SET 'utf8mb4' 
            COLLATE 'utf8mb4_general_ci'


Cada vez que se inicia la aplicación, intentamos crear una tabla para cada propiedad. No se recomienda que los clientes de Telegram reinicien más de una vez cada pocas horas, por lo que no tendremos varias solicitudes para crear tablas por segundo :)



Dado que la clave principal no se incrementa automáticamente, entonces la inserción y actualización de datos se puede hacer con una solicitud, como en una matriz normal:



INSERT INTO `{$this->table}` 
            SET `key` = :index, `value` = :value 
            ON DUPLICATE KEY UPDATE `value` = :value


Se crea una tabla con un nombre en el formato% account_id% _% class% _% variable_name% para cada variable. Pero cuando inicia la aplicación por primera vez, todavía no hay una cuenta. En este caso, debe generar una identificación temporal aleatoria con el prefijo tmp. En cada lanzamiento, la clase de cada variable comprueba si ha aparecido la identificación de la cuenta. Si id está presente, se cambiará el nombre de las tablas.



Índices



La estructura de la base de datos es lo más simple posible, de modo que en el futuro se agregarán automáticamente nuevas propiedades. No hay conexiones. Solo se utilizan índices de clave PRIMARY. Pero hay situaciones en las que necesita buscar en otros campos.



Por ejemplo, hay una matriz / chats de mesa. La clave es la identificación del chat. Pero a menudo tienes que buscar por nombre de usuario. Cuando la aplicación almacenaba datos en matrices, la búsqueda por nombre de usuario se realizaba como de costumbre, iterando sobre la matriz en foreach. Esta búsqueda funcionó a una velocidad aceptable en la memoria, pero no en la base de datos. Por lo tanto, se creó otra tabla / matriz y la propiedad correspondiente en la clase. La clave es el nombre de usuario, el valor es la identificación del chat. El único inconveniente de este enfoque es que debe escribir código adicional para sincronizar las dos tablas.



Almacenamiento en caché



MySQL local es rápido, pero un poco de almacenamiento en caché nunca está de más. Especialmente si el mismo valor se usa varias veces seguidas. Por ejemplo, primero verificamos la presencia de un chat en la base de datos y luego obtenemos algunos datos de él.



Se escribió un rasgo de bicicleta simple .



<?php

namespace danog\MadelineProto\Db;

use Amp\Loop;
use danog\MadelineProto\Logger;

trait ArrayCacheTrait
{
    /**
     * Values stored in this format:
     * [
     *      [
     *          'value' => mixed,
     *          'ttl' => int
     *      ],
     *      ...
     * ].
     * @var array
     */
    protected array $cache = [];
    protected string $ttl = '+5 minutes';
    private string $ttlCheckInterval = '+1 minute';

    protected function getCache(string $key, $default = null)
    {
        $cacheItem = $this->cache[$key] ?? null;
        $result = $default;

        if (\is_array($cacheItem)) {
            $result = $cacheItem['value'];
            $this->cache[$key]['ttl'] = \strtotime($this->ttl);
        }

        return $result;
    }

    /**
     * Save item in cache.
     *
     * @param string $key
     * @param $value
     */
    protected function setCache(string $key, $value): void
    {
        $this->cache[$key] = [
            'value' => $value,
            'ttl' => \strtotime($this->ttl),
        ];
    }

    /**
     * Remove key from cache.
     *
     * @param string $key
     */
    protected function unsetCache(string $key): void
    {
        unset($this->cache[$key]);
    }

    protected function startCacheCleanupLoop(): void
    {
        Loop::repeat(\strtotime($this->ttlCheckInterval, 0) * 1000, fn () => $this->cleanupCache());
    }

    /**
     * Remove all keys from cache.
     */
    protected function cleanupCache(): void
    {
        $now = \time();
        $oldKeys = [];
        foreach ($this->cache as $cacheKey => $cacheValue) {
            if ($cacheValue['ttl'] < $now) {
                $oldKeys[] = $cacheKey;
            }
        }
        foreach ($oldKeys as $oldKey) {
            $this->unsetCache($oldKey);
        }

        Logger::log(
            \sprintf(
                "cache for table:%s; keys left: %s; keys removed: %s",
                $this->table,
                \count($this->cache),
                \count($oldKeys)
            ),
            Logger::VERBOSE
        );
    }
}


Me gustaría prestar especial atención a startCacheCleanupLoop. Gracias a la magia de amphp, invalidar la caché es lo más simple posible. La devolución de llamada comienza en el intervalo especificado, recorre todos los valores y examina el campo ts, que almacena la marca de tiempo de la última llamada a este elemento. Si la llamada fue hace más de 5 minutos (configurable en la configuración), entonces el elemento se elimina. Es muy fácil implementar un análogo ttl desde redis o memcache usando amphp. Todo esto sucede en segundo plano y no bloquea el hilo principal.



Con la ayuda de la caché y la asincronía, no solo se aceleran las lecturas, sino también las escrituras.



Aquí está el código fuente del método que escribe datos en la base de datos.



/**
     * Set value for an offset.
     *
     * @link https://php.net/manual/en/arrayiterator.offsetset.php
     *
     * @param string $index <p>
     * The index to set for.
     * </p>
     * @param $value
     *
     * @throws \Throwable
     */

    public function offsetSet($index, $value): Promise
    {
        if ($this->getCache($index) === $value) {
            return call(fn () =>null);
        }

        $this->setCache($index, $value);

        $request = $this->request(
            "
            INSERT INTO `{$this->table}` 
            SET `key` = :index, `value` = :value 
            ON DUPLICATE KEY UPDATE `value` = :value
        ",
            [
                'index' => $index,
                'value' => \serialize($value),
            ]
        );

        //Ensure that cache is synced with latest insert in case of concurrent requests.
        $request->onResolve(fn () => $this->setCache($index, $value));

        return $request;
    }


$ this-> request crea una Promesa que escribe datos de forma asincrónica. Y las operaciones con la caché ocurren de forma sincrónica. Es decir, no tiene que esperar una escritura en la base de datos y, al mismo tiempo, asegurarse de que las operaciones de lectura comenzarán a devolver nuevos datos inmediatamente.



El método onResolve de amphp resultó ser muy útil. Una vez completada la inserción, los datos se volverán a escribir en la caché. Si alguna operación de escritura se retrasa y la caché y la base comienzan a diferir, la caché se actualizará con el valor escrito en la base al final. Aquellos. nuestro caché volverá a ser coherente con la base.



Fuente



Enlace a la solicitud de extracción



Y así, otro usuario agregó soporte para postgre. Solo tomó 5 minutos escribir las instrucciones .



La cantidad de código podría reducirse moviendo los métodos duplicados a la clase abstracta general SqlArray.



Una cosa más



Se notó que al descargar archivos multimedia desde Telegram, el recolector de basura estándar php no hace frente al trabajo y partes del archivo permanecen en la memoria. Normalmente, las fugas eran del mismo tamaño que el archivo. Causa posible: el recolector de basura se activa automáticamente cuando se acumulan 10,000 enlaces. En nuestro caso, había pocos enlaces (decenas), pero cada uno podía referirse a megabytes de datos en la memoria. Fue muy perezoso estudiar miles de líneas de código con la implementación mtproto. ¿Por qué no probar la elegante muleta con \ gc_collect_cycles (); primero?



Sorprendentemente, resolvió el problema. Esto quiere decir que basta con configurar el inicio periódico de la limpieza. Afortunadamente, amphp proporciona herramientas sencillas para la ejecución en segundo plano a intervalos específicos.



Limpiar la memoria cada segundo parecía demasiado fácil y no muy efectivo. Me decidí por un algoritmo que verifica la ganancia de memoria desde la última limpieza. El borrado ocurre si la ganancia es mayor que el umbral.



<?php

namespace danog\MadelineProto\MTProtoTools;

use Amp\Loop;
use danog\MadelineProto\Logger;

class GarbageCollector
{
    /**
     * Ensure only one instance of GarbageCollector
     * 		when multiple instances of MadelineProto running.
     * @var bool
     */
    public static bool $lock = false;

    /**
     * How often will check memory.
     * @var int
     */
    public static int $checkIntervalMs = 1000;

    /**
     * Next cleanup will be triggered when memory consumption will increase by this amount.
     * @var int
     */
    public static int $memoryDiffMb = 1;

    /**
     * Memory consumption after last cleanup.
     * @var int
     */
    private static int $memoryConsumption = 0;

    public static function start(): void
    {
        if (static::$lock) {
            return;
        }
        static::$lock = true;

        Loop::repeat(static::$checkIntervalMs, static function () {
            $currentMemory = static::getMemoryConsumption();
            if ($currentMemory > static::$memoryConsumption + static::$memoryDiffMb) {
                \gc_collect_cycles();
                static::$memoryConsumption = static::getMemoryConsumption();
                $cleanedMemory = $currentMemory - static::$memoryConsumption;
                Logger::log("gc_collect_cycles done. Cleaned memory: $cleanedMemory Mb", Logger::VERBOSE);
            }
        });
    }

    private static function getMemoryConsumption(): int
    {
        $memory = \round(\memory_get_usage()/1024/1024, 1);
        Logger::log("Memory consumption: $memory Mb", Logger::ULTRA_VERBOSE);
        return (int) $memory;
    }
}



All Articles