Orquestador de infinitas tareas

En este artĂ­culo hablaremos sobre cĂłmo implementar un orquestador de tareas infinitas usando colas. Como objetivo final, necesitamos implementar un sistema que pueda administrar tareas con una larga vida Ăştil, un sistema distribuido, donde un grupo de tareas se alojen en un servidor especĂ­fico, y en caso de falla de este servidor, las tareas se redistribuyen automáticamente a las libres. 





En la mayorĂ­a de los casos, todo desarrollo empresarial se reduce a cumplir con los mismos requisitos: se crea una aplicaciĂłn, segĂşn el tipo de aplicaciĂłn, tiene algĂşn tipo de ciclo de vida, al final de la vida de la aplicaciĂłn recibimos (... o no recibimos) lo que queremos. Por aplicaciĂłn, podemos referirnos a cualquier cosa, desde la compra en lĂ­nea de un producto, un giro postal o el cálculo de la trayectoria de un misil balĂ­stico. Cada aplicaciĂłn tiene su propia forma de vida y lo que es importante tener en cuenta es  la vida Ăştil , y cuanto más corto este tiempo, mejor. En otras palabras, cuanto antes se complete mi transferencia bancaria, mejor. Los requisitos tambiĂ©n son similares, más   operaciones RPC por segundo, menos  latencia , el sistema debe ser tolerante a fallas, escalable y estar listo  ayer... Hay un millĂłn de herramientas, cientos de bases de datos, diferentes enfoques y patrones. Y todo ha sido escrito durante mucho tiempo, solo tenemos que usar las tecnologĂ­as listas para usar correctamente en nuestros proyectos. 





El tema de la orquestaciĂłn de tareas no es nuevo, pero para mi sorpresa, simplemente no hay soluciones listas para administrar tareas infinitas (cuya vida Ăştil es ilimitada), con la posibilidad de redistribuir tareas entre servidores activos. Por lo tanto, implementaremos nuestra propia soluciĂłn. Pero lo primero es lo primero…. 






, . — (Job), , . . , “”, . : , . , , , . “”-  WebSocket ,  connected. , , , , . , “”  Observer  , , . 





, , .     : 





  • , , . 





  • , , . 





  • , , . , , , , . 





  • /, , ( , RAM ..), . 





: N , .  , , . 





3 . #, . , C# .Net. 





  1.  Task. .  Task  “”.  





  2. Schedulers. , . , , . 





  3. , , . ,    .  RabbitMq,  Framework - MassTransit, . . 





 Task 

 Task. , ( , ). 





. ,   “Hello Word”   : 





public async Task SendEmailAsync(Email email, CancellationToken token) 
{ 
	   //   
}
      
      



, ,  await   SendEmailAsync. 





foreach (var email in emails 
{ 
   if(token.IsCancellationRequested) 
        break; 
    _emailSender.SendEmailAsync(email, token); // await 
} 
      
      



:









  • FireAndForget   Exception  . 









  • , ,    .  





 await- ,    async/await  .





 email, ,  CancellationToken. , , , , .  RetryPolicy  , ?! , .





Schedulers

.NET , . 





  • HangFire 





  • Quartz.net 





  .  , ( , — , ,  instance )  /Tasks.  Hangfire, - UI, , . .





,  Hangfire.  BackgroundJob.Enqueue(Expression<Action> methodCall). 





var jobIds = new List<string>(); 
foreach (var email in emails) 
{ 
   if(token.IsCancellationRequested) 
      break; 
   jobIds.Add(BackgroundJob.Enqueue( 
      async () => await _emailSender.SendEmailAsync(email, token))); 
} 
      
      



, , .  RetryPolicy , . , , . 





, . , “” :





_observer.DoWork(observerArg, new CancellationToken())







- , .   BackgroundJobClient





var client = new BackgroundJobClient(JobStorage.Current);

//  ,    .
var state = new EnqueuedState(“unique-queue-name”); 
client.Create(() =>_observer.DoWork(observerArg, new CancellationToken()), state);
      
      



, . - unique-queue-name





//  instance hangfire . 
_server = new BackgroundJobServer(new BackgroundJobServerOptions() 
{   
    WorkerCount = 10, 
    Queues = new[] { “unique-queue-name” }, 
    ServerName = _serverOptions.ServerName 
}); 
      
      



WorkerCount - , . , . 





, , . : . , .  Hangfire  , , . 





_monitoringApi = JobStorage.Current.GetMonitoringApi(); 
      
      







Observer-service  - , , ( HangFilre  WorkerCount ). 





Observer-manager - , ... .  , , . . 





Scheduler common db â€“ -  , Hangfire   MsSql,  PostgreSql   Redis.





—    . “”.  





, , , , , , . 





, , . , .    Hangfire. :





1)   . , , . 





2)  . . , , , . 





3)  .  custom-id, . - . 





4)  , “default” . , , .  job-filters   . , . 





5)  , , , .  , , ,  framework  .  





6)  ,  Hangfire   MsSql,  Redis, . 





, .  , , , , . 





, ,

, , , . . ? , — , , . , ? , . , , . , . 





? “”.  - PrefetchCount  .  





  •  Ready. 





  •  Conumer  ,  Unacked.  Consumer  .  





  • , _Error .  





  •  acknowledged,  Consumer.  





- PrefetchCount  , ( ),  WorkerCount,  Hangfire. 









 Observer-services, . PrefetchCount  1



. , . , ,  Unacked. 





"”, : 





 Observer-services  , ,  Round-robin





  • msg1  .  , “Observer 1”.  Unacked  ,  . 





  • msg2  . “Observer 1”  ,  ,  â€śObserver 2”. 





, “Observer-service 1”  , ( - “ ... ?”). 





 , ,  acknowledgement   Unacked  Ready.  . , , . 





-  , . _Error,  RetryPolicy. ,    .  

 RetryPolicy  : 





  • 1000 . 





  • 5 1,4,10...  . 





  •  int.MaxValue . 





? “”, /.  PrefetchCount, 10, 10 , .    - , ,  . , 10 , 5 “”, , ,    11- , .





 ? ? , , ... ?! , , "" ,  CancellationToken.  





 Manager. . , , . , . , , : 





  • Id () - Guid  . 





  • Name (), , , . 





  • CreatedAt/ModifyAt ( / ). 





  • WorkersCount,  PrefetchCount - , . 





Manager  . 





Id





Name





WorkerCount





CreatedAt





ModifyAt





IsDeleted





{Unique id} 





Observer service 1





10





{some date}





null





false





{Unique id} 





Observer service 2





10





{some date}





null





false





{Unique id} 





Observer service 3





10





{some date}





null





false





 . , , 3 - . 





,  ,   , N .  IsDeleted=true





, (Kill â€“9, ). ,  Docker. , , . “”, , , . , - …. 





“” API. ( , “State queue” ). “” , , , , - .





, , “”. , , , , . 





,  ,  “”  Created.





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer





{created date}





null





null





Created





, , ,  Processing  .  





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer





{created date}





{modify date} 





{Observer service 1 id} 





Processing 





“” . 









  • Created  





  • Processing 





  • OnDeleting





  • Deleted 





"", : 





1) ,  CancellationToken. 





2) ,  FanOut. ,  “” , . 





, — , ... “ ”.  





 Observer-service  , . , “”  CancellationToken. “” . 





“” . ,  id . , .  





  •  Created, “” . - , “”. 





  •  OnDeleting  Deleted, - “” , . 





  •  Processing, “”  OnDeleting  .    . , “”,  CancellationToken  “state queue”. ,  OnDeleting  Deleted





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer 





{created date}





{modify date} 





{Observer service 1 id} 





Deleted 





: 





1)





  ,  . , - MsSql, RabbitMq, Kafka,  Kubernetes  , , SLA . , . - , .  





2)  blackout, . 





, - , , , , , “”, . “”, .  ( , .) 





3)





, "”, . "”, , . 





4) . , "”. 





. - , , . 





5) “”, , . 





, , “” . . . , , , , , . 





, . , , - Unacked, - Ready. , ,  polling , . - "”, . , , ,  PrefetchCount. , , .








All Articles