Atomics.wait()está bloqueando, no se puede llamar en el hilo principal (si intenta hacer esto, se producirá un error TypeError).
El motor V8 desde la versión 8.7 admite una opción sin bloqueo
Atomics.wait()llamada Atomics.waitAsync () . Este nuevo método se puede utilizar en el hilo principal.
Hoy le mostraremos cómo usar estas API de bajo nivel para crear un mutex que se pueda ejecutar de forma sincrónica (en subprocesos de trabajo) y asincrónica (en subprocesos de trabajo o en el subproceso principal).
Atomics.wait () y Atomics.waitAsync ()
Métodos
Atomics.wait()y Atomics.waitAsync()tomar los siguientes parámetros:
buffer: una matriz de tipoInt32ArrayoBigInt64Array, que se basa enSharedArrayBuffer.index: el índice real del elemento en la matriz.expectedValue: el valor que esperamos que se represente en la memoria en la ubicación descrita conbufferyindex.timeout: tiempo de espera en milisegundos (opcional, predeterminado enInfinity).
Atomics.wait()devuelve una cadena. Si el valor esperado no se encuentra en la ubicación de memoria especificada, Atomics.wait()sale inmediatamente y devuelve una cadena not-equal. De lo contrario, el hilo se bloquea. Debe ocurrir uno de los siguientes eventos para que se libere el bloqueo. La primera es una llamada desde otro hilo de un método Atomics.notify()con una indicación del lugar en la memoria en el que está interesado el método Atomics.wait(). El segundo es la expiración del tiempo de espera. En el primer caso, Atomics.wait()devolverá una cadena ok, en el segundo, un valor de cadena timed-out.
El método
Atomics.notify()toma los siguientes parámetros:
typedArray: una matriz de tipoInt32ArrayoBigInt64Array, que se basa enSharedArrayBuffer.index: el índice real del elemento en la matriz.count: número de agentes en espera de notificación (parámetro opcional, establecido por defectoInfinity).
El método
Atomics.notify()notifica el número especificado de agentes que esperan notificación en la dirección descrita typedArrayy los indexomite en el orden FIFO. Si se han realizado varias llamadas Atomics.wait()o Atomics.waitAsync()están viendo el mismo lugar en la memoria, todas terminan en la misma cola.
A diferencia de un método
Atomics.wait(), un método Atomics.waitAsync()devuelve inmediatamente un valor en la ubicación de la llamada. Puede ser uno de los siguientes valores:
{ async: false, value: 'not-equal' }- si la ubicación de memoria especificada no contiene el valor esperado.{ async: false, value: 'timed-out' }- solo cuando el tiempo de espera se establece en 0.{ async: true, value: promise }- en otros casos.
Una promesa, después de un tiempo, puede resolverse con éxito mediante un valor de cadena
ok(si se llamó a un método Atomics.notify(), a qué información sobre el lugar en la memoria que se pasó se pasó Atomics.waitAsync()). Se puede resolver con un valor timed-out. Esta promesa nunca se rechaza.
El siguiente ejemplo demuestra los conceptos básicos de uso
Atomics.waitAsync():
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ - ()
// | ^
// ^
if (result.value === 'not-equal') {
// SharedArrayBuffer .
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* */ }
else { /* - */ }
});
}
// :
Atomics.notify(i32a, 0);
Ahora hablemos sobre cómo crear un mutex que se pueda usar tanto en modo síncrono como asíncrono. Cabe señalar que la implementación de la versión síncrona del mutex se ha discutido anteriormente. Por ejemplo, en este material.
En este ejemplo, no usaremos el parámetro
timeoutal llamar Atomics.wait()y Atomics.waitAsync(). Este parámetro se puede utilizar para implementar condicionales relacionados con el tiempo de espera.
Nuestra clase que
AsyncLockrepresenta un mutex funciona con un búfer SharedArrayBuffere implementa los siguientes métodos:
lock(): bloquea el hilo hasta que tengamos la oportunidad de capturar el mutex (aplicable solo en el hilo de trabajo).unlock(): libera el mutex (este es el opuestolock()).executeLocked(callback): intenta adquirir el bloqueo sin bloquear el hilo. Este método se puede utilizar en el hilo principal. Planea ejecutar la devolución de llamada en el momento en que podamos adquirir el bloqueo.
Echemos un vistazo a cómo se pueden implementar estos métodos. La declaración de clase incluye constantes y un constructor que toma un búfer
SharedArrayBuffer.
class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;
constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}
lock() {
/* … */
}
unlock() {
/* … */
}
executeLocked(f) {
/* … */
}
}
Aquí el elemento
i32a[0]contiene el valor LOCKEDo UNLOCKED. Él, además, representa el lugar en la memoria que interesa Atomics.wait()y Atomics.waitAsync(). La clase AsyncLockproporciona las siguientes capacidades básicas:
i32a[0] == LOCKEDy el hilo está en un estado de espera (después de una llamadaAtomics.wait()oAtomics.waitAsync()), mirandoi32a[0], eventualmente será notificado.- Después de que se notifique al hilo, intentará adquirir el bloqueo. Si tiene éxito, cuando suelte el bloqueo, llamará
Atomics.notify().
Captura y liberación de bloqueo sincrónico
Considere el código de un método
lock()que solo se puede llamar desde un hilo de trabajo.
lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.UNLOCKED,
/* >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< ,
}
}
Cuando se llama a un método desde un hilo
lock(), primero intenta adquirir el bloqueo, usándolo Atomics.compareExchange()para cambiar el estado del bloqueo de UNLOCKEDa LOCKED. El método Atomics.compareExchange()intenta realizar una operación atómica de cambiar el estado de bloqueo, devuelve el valor original ubicado en el área de memoria especificada. Si el valor original era UNLOCKED, esto nos dirá que el cambio de estado fue exitoso y que el hilo ha adquirido el bloqueo. No necesitas hacer nada más.
Si no
Atomics.compareExchange()pudo cambiar el estado del candado, significa que otro hilo está reteniendo el candado. Como resultado, el hilo desde el que se llama al método lock()intenta utilizar el métodoAtomics.wait()para esperar hasta que otro hilo libere el bloqueo. Si el valor esperado todavía se almacena en el área de memoria de interés (en nuestro caso - AsyncLock.LOCKED), la llamada Atomics.wait()bloqueará el hilo. El retorno de Atomics.wait()solo ocurrirá cuando otro hilo llame Atomics.notify().
El método
unlock()libera el bloqueo al establecerlo en el estado UNLOCKEDy lo llama Atomics.notify()para notificar a los agentes que están esperando que se libere el bloqueo. Se supone que una operación de cambio de estado de bloqueo siempre tiene éxito. Esto se debe a que el hilo que realiza esta operación está bloqueado. Por lo tanto, nada más debería llamar al método en este momento unlock().
unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.LOCKED,
/* >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Tried to unlock while not holding the mutex');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}
En un caso típico, todo sucede así: el candado queda libre y el hilo T1 lo captura, cambiando su estado usando
Atomics.compareExchange(). El subproceso T2 intenta adquirir el bloqueo llamándolo Atomics.compareExchange(), pero no puede cambiar su estado. Luego T2 llama Atomics.wait(), esta llamada bloqueará el hilo. Después de un tiempo, el hilo T1 libera el bloqueo y llama Atomics.notify(). Esto hace que la llamada Atomics.wait()a T2 regrese oky el hilo T2 salga del bloqueo. Luego, T2 intenta adquirir el bloqueo nuevamente. Esta vez lo consigue.
Aquí hay dos casos especiales. Su análisis tiene como objetivo demostrar las razones
Atomics.wait()y Atomics.waitAsync()comprobar un valor específico en el índice especificado del elemento de la matriz. Estos son los casos:
- T1 , T2 . T2 ,
Atomics.compareExchange(), . T1 , T2Atomics.wait(). T2Atomics.wait(),not-equal. T2 . - T1 , T2
Atomics.wait(). T1 , T2 (Atomics.wait())Atomics.compareExchange(). , T3, . .Atomics.compareExchange()T2 . T2Atomics.wait(), T3 .
El último caso especial demuestra el hecho de que nuestro mutex no funciona correctamente. Puede suceder que el hilo T2 estuviera esperando a que se liberara el bloqueo, pero T3 logró adquirirlo inmediatamente después de su liberación. Una implementación de bloqueo que sea más adecuada para el uso en el mundo real puede utilizar varios estados de bloqueo que existen para distinguir entre situaciones en las que el bloqueo simplemente se "adquirió" y en las que "hubo un conflicto durante la adquisición".
Captura de bloqueo asincrónico
Un método sin bloqueo
executeLocked()puede, a diferencia de un método lock(), llamarse desde el hilo principal. Recibe, como único parámetro, una devolución de llamada y programa la devolución de llamada después de que se adquiere un bloqueo exitoso.
executeLocked(f) {
const self = this;
async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.UNLOCKED,
/* >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ ,
await result.value;
}
}
tryGetLock();
}
La función interna
tryGetLock()primero intenta adquirir el bloqueo con Atomics.compareExchange(). Si llamar a este método da como resultado un cambio de estado de bloqueo exitoso, la función puede llamar a una devolución de llamada y luego liberar el bloqueo y salir.
Si la llamada
Atomics.compareExchange()no permitió adquirir el bloqueo, debemos intentar hacerlo nuevamente, en el momento en que el bloqueo probablemente esté libre. Pero no podemos bloquear el hilo y esperar a que se libere el bloqueo. En cambio, estamos Atomics.waitAsync()programando un nuevo intento de adquirir el bloqueo utilizando el método y la promesa que devuelve.
Si logramos ejecutar el método
Atomics.waitAsync(), entonces la promesa devuelta por este método se resuelve cuando el hilo que contenía el bloqueo llamaAtomics.notify()... Después de eso, el hilo que quería adquirir el bloqueo, como antes, intenta hacerlo nuevamente.
Aquí, son posibles aquellos casos especiales que son típicos de la versión síncrona (el bloqueo se libera entre llamadas
Atomics.compareExchange()y Atomics.waitAsync(); el bloqueo es capturado por otro hilo, haciéndolo entre los momentos de resolución de la promesa y la llamada Atomics.compareExchange()). Por tanto, en un código similar aplicable en proyectos reales, esto debe tenerse en cuenta.
Salir
En este artículo, hablamos de las primitivas de sincronización de bajo nivel
Atomics.wait(), Atomics.waitAsync()y Atomics.notify(). Hemos analizado un ejemplo de creación de un mutex basado en ellos, que se puede utilizar tanto en el hilo principal como en los hilos de trabajo.
¿Atomics.wait (), Atomics.waitAsync () y Atomics.notify () serán útiles en sus proyectos?