Conociendo el problema
En el desarrollo comercial, muchos casos de uso de aprendizaje automático implican una arquitectura de múltiples inquilinos y requieren entrenar un modelo separado para cada cliente y / o usuario.
Como ejemplo, considere pronosticar las compras y la demanda de ciertos productos mediante el aprendizaje automático. Si tiene una cadena de tiendas minoristas, puede utilizar los datos del historial de compras de los clientes y la demanda total de estos productos para predecir los costos y los volúmenes de compra de cada tienda de forma individual.
La mayoría de las veces, en tales casos, para implementar modelos, escribe un servicio Flask y lo coloca en un contenedor Docker. Hay muchos ejemplos de servidores de aprendizaje automático de un solo modelo, pero cuando se trata de implementar varios modelos, el desarrollador tiene pocas opciones disponibles para resolver el problema.
En las aplicaciones de múltiples inquilinos, el número de inquilinos no se conoce de antemano y puede ser prácticamente ilimitado: en algún momento puede tener solo un cliente y en otro momento puede servir modelos separados para cada usuario a miles de usuarios. Aquí es donde comienzan a surgir las limitaciones del enfoque de implementación estándar:
Si implementamos un contenedor Docker para cada cliente, terminamos con una aplicación muy grande y costosa que será bastante difícil de administrar.
Un solo contenedor, en cuya imagen están todos los modelos, tampoco nos sirve, ya que miles de modelos pueden funcionar en el servidor, y se agregan nuevos modelos en tiempo de ejecución.
Decisión
, . , Airflow S3, ML — .
ML — , : -> .
, :
Model — , ; SklearnModel, TensorFlowModel, MyCustomModel . .
ModelInfoRepository — , userid -> modelid. , SQAlchemyModelInfoRepository.
ModelRepository — , ID. FileSystemRepository, S3Repository .
from abc import ABC
class Model(ABC):
@abstractmethod
def predict(self, data: pd.DataFrame) -> np.ndarray:
raise NotImplementedError
class ModelInfoRepository(ABC):
@abstractmethod
def get_model_id_by_user_id(self, user_id: str) -> str:
raise NotImplementedError
class ModelRepository(ABC):
@abstractmethod
def get_model(self, model_id: str) -> Model:
raise NotImplementedError
, sklearn, Amazon S3 userid -> modelid, .
class SklearnModel(Model):
def __init__(self, model):
self.model = model
def predict(self, data: pd.DataFrame):
return self.model.predict(data)
class SQAlchemyModelInfoRepository(ModelInfoRepository):
def __init__(self, sqalchemy_session: Session):
self.session = sqalchemy_session
def get_model_id_by_user_id(user_id: str) -> str:
# implementation goes here, query a table in any Database
class S3ModelRepository(ModelRepository):
def __init__(self, s3_client):
self.s3_client = s3_client
def get_model(self, model_id: str) -> Model:
# load and deserialize pickle from S3, implementation goes here
:
def make_app(model_info_repository: ModelInfoRepository,
model_repsitory: ModelRepository) -> Flask:
app = Flask("multi-model-server")
@app.predict("/predict/<user_id>")
def predict(user_id):
model_id = model_info_repository.get_model_id_by_user_id(user_id)
model = model_repsitory.get_model(model_id)
data = pd.DataFrame(request.json())
predictions = model.predict(data)
return jsonify(predictions.tolist())
return app
, Flask ; sklearn TensorFlow S3 , Flask .
, , . , . cachetools:
from cachetools import Cache
class CachedModelRepository(ModelRepository):
def __init__(self, model_repository: ModelRepository, cache: Cache):
self.model_repository = model_repository
self.cache = cache
@abstractmethod
def get_model(self, model_id: str) -> Model:
if model_id not in self.cache:
self.cache[model_id] = self.model_repository.get_model(model_id)
return self.cache[model_id]
:
from cachetools import LRUCache
model_repository = CachedModelRepository(
S3ModelRepository(s3_client),
LRUCache(max_size=10)
)
- , . , , MLOps . . , . №4 Google: , - .