En este artĂculo hablaremos sobre cĂłmo implementar un orquestador de tareas infinitas usando colas. Como objetivo final, necesitamos implementar un sistema que pueda administrar tareas con una larga vida Ăştil, un sistema distribuido, donde un grupo de tareas se alojen en un servidor especĂfico, y en caso de falla de este servidor, las tareas se redistribuyen automáticamente a las libres.
En la mayorĂa de los casos, todo desarrollo empresarial se reduce a cumplir con los mismos requisitos: se crea una aplicaciĂłn, segĂşn el tipo de aplicaciĂłn, tiene algĂşn tipo de ciclo de vida, al final de la vida de la aplicaciĂłn recibimos (... o no recibimos) lo que queremos. Por aplicaciĂłn, podemos referirnos a cualquier cosa, desde la compra en lĂnea de un producto, un giro postal o el cálculo de la trayectoria de un misil balĂstico. Cada aplicaciĂłn tiene su propia forma de vida y lo que es importante tener en cuenta es la vida Ăştil , y cuanto más corto este tiempo, mejor. En otras palabras, cuanto antes se complete mi transferencia bancaria, mejor. Los requisitos tambiĂ©n son similares, más operaciones RPC por segundo, menos latencia , el sistema debe ser tolerante a fallas, escalable y estar listo ayer... Hay un millĂłn de herramientas, cientos de bases de datos, diferentes enfoques y patrones. Y todo ha sido escrito durante mucho tiempo, solo tenemos que usar las tecnologĂas listas para usar correctamente en nuestros proyectos.
El tema de la orquestación de tareas no es nuevo, pero para mi sorpresa, simplemente no hay soluciones listas para administrar tareas infinitas (cuya vida útil es ilimitada), con la posibilidad de redistribuir tareas entre servidores activos. Por lo tanto, implementaremos nuestra propia solución. Pero lo primero es lo primero….
, . — (Job), , . . , “”, . : , . , , , . “”- WebSocket , connected. , , , , . , “” Observer , , .
, , . :
, , .
, , .
, , . , , , , .
/, , ( , RAM ..), .
: N , . , , .
3 . #, . , C# .Net.
Task. . Task “”.
Schedulers. , . , , .
, , . , . RabbitMq, Framework - MassTransit, . .
Task
Task. , ( , ).
. , “Hello Word” :
public async Task SendEmailAsync(Email email, CancellationToken token)
{
//
}
, , await SendEmailAsync.
foreach (var email in emails
{
if(token.IsCancellationRequested)
break;
_emailSender.SendEmailAsync(email, token); // await
}
:
.
FireAndForget Exception .
.
, , .
await- , async/await .
email, , CancellationToken. , , , , . RetryPolicy , ?! , .
Schedulers
.NET , .
. , ( , — , , instance ) /Tasks. Hangfire, - UI, , . .
, Hangfire. BackgroundJob.Enqueue(Expression<Action> methodCall).
var jobIds = new List<string>();
foreach (var email in emails)
{
if(token.IsCancellationRequested)
break;
jobIds.Add(BackgroundJob.Enqueue(
async () => await _emailSender.SendEmailAsync(email, token)));
}
, , . RetryPolicy , . , , .
, . , “” :
_observer.DoWork(observerArg, new CancellationToken())
- , . BackgroundJobClient.
var client = new BackgroundJobClient(JobStorage.Current);
// , .
var state = new EnqueuedState(“unique-queue-name”);
client.Create(() =>_observer.DoWork(observerArg, new CancellationToken()), state);
, . - unique-queue-name.
// instance hangfire .
_server = new BackgroundJobServer(new BackgroundJobServerOptions()
{
WorkerCount = 10,
Queues = new[] { “unique-queue-name” },
ServerName = _serverOptions.ServerName
});
WorkerCount - , . , .
, , . : . , . Hangfire , , .
_monitoringApi = JobStorage.Current.GetMonitoringApi();
:
Observer-service - , , ( HangFilre WorkerCount ).
Observer-manager - , ... . , , . .
Scheduler common db – - , Hangfire MsSql, PostgreSql Redis.
— . “”.
, , , , , , .
, , . , . Hangfire. :
1) . , , .
2) . . , , , .
3) . custom-id, . - .
4) , “default” . , , . job-filters . , .
5) , , , . , , , framework .
, . , , , , .
, ,
, , , . . ? , — , , . , ? , . , , . , .
? “”. - PrefetchCount .
Ready.
Conumer , Unacked. Consumer .
, _Error .
acknowledged, Consumer.
- PrefetchCount , ( ), WorkerCount, Hangfire.
:
Observer-services, . PrefetchCount 1
. , . , , Unacked.
"”, :
Observer-services , , Round-robin.
msg1 . , “Observer 1”. Unacked , .
msg2 . “Observer 1” , , “Observer 2”.
, “Observer-service 1” , ( - “ ... ?”).
, , acknowledgement Unacked Ready. . , , .
- , . _Error, RetryPolicy. , .
RetryPolicy :
1000 .
5 1,4,10... .
int.MaxValue .
? “”, /. PrefetchCount, 10, 10 , . - , , . , 10 , 5 “”, , , 11- , .
? ? , , ... ?! , , "" , CancellationToken.
Manager. . , , . , . , , :
Id () - Guid .
Name (), , , .
CreatedAt/ModifyAt ( / ).
WorkersCount, PrefetchCount - , .
Manager .
Id |
Name |
WorkerCount |
CreatedAt |
ModifyAt |
IsDeleted |
{Unique id} |
Observer service 1 |
10 |
{some date} |
null |
false |
{Unique id} |
Observer service 2 |
10 |
{some date} |
null |
false |
{Unique id} |
Observer service 3 |
10 |
{some date} |
null |
false |
. , , 3 - .
, , , N . IsDeleted=true.
, (Kill –9, ). , Docker. , , . “”, , , . , - ….
“” API. ( , “State queue” ). “” , , , , - .
, , “”. , , , , .
, , “” Created.
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
null |
null |
Created |
, , , Processing .
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
{modify date} |
{Observer service 1 id} |
Processing |
“” .
:
Created
Processing
OnDeleting
Deleted
"", :
1) , CancellationToken.
2) , FanOut. , “” , .
, — , ... “ ”.
Observer-service , . , “” CancellationToken. “” .
“” . , id . , .
Created, “” . - , “”.
OnDeleting Deleted, - “” , .
Processing, “” OnDeleting . . , “”, CancellationToken “state queue”. , OnDeleting Deleted.
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
{modify date} |
{Observer service 1 id} |
Deleted |
:
1) .
, . , - MsSql, RabbitMq, Kafka, Kubernetes , , SLA . , . - , .
2) blackout, .
, - , , , , , “”, . “”, . ( , .)
3) .
, "”, . "”, , .
4) . , "”.
. - , , .
5) “”, , .
, , “” . . . , , , , , .
, . , , - Unacked, - Ready. , , polling , . - "”, . , , , PrefetchCount. , , .