Enseñar a la IA a distribuir pasteles en las tiendas mediante el aprendizaje por refuerzo

Introducción



Una vez, mientras leía el libro "Aprendizaje por refuerzo: una introducción", pensé en complementar mis conocimientos teóricos con conocimientos prácticos, pero no tenía ganas de resolver el siguiente problema de equilibrar una barra, enseñar a un agente a jugar al ajedrez o inventar otra bicicleta.



Al mismo tiempo, el libro contenía un ejemplo interesante de optimización de la cola de clientes que, por un lado, no es demasiado complicado en términos de implementación / comprensión del proceso, y por otro lado, es bastante interesante y se puede implementar con cierto éxito en la vida real.



Habiendo cambiado ligeramente este ejemplo, llegué a la idea, que se discutirá más adelante.



Formulación del problema



Entonces, imagina la siguiente imagen:







Tenemos a nuestra disposición una panadería que produce 6 (condicionalmente) toneladas de tartas de frambuesa todos los días y distribuye estos productos a tres tiendas todos los días.



Sin embargo, cuál es la mejor manera de hacer esto para que haya la menor cantidad posible de productos caducados (siempre que la vida útil de los pasteles sea de tres días), si solo tenemos tres camiones con una capacidad de 1, 2 y 3 toneladas, respectivamente, en cada punto de venta es más rentable enviar solo un camión (porque están lo suficientemente alejados entre sí) y, además, solo una vez al día después de hornear las tartas, y además, no conocemos el poder adquisitivo en nuestras tiendas (ya que el negocio acaba de comenzar)?



Aceptemos que la estrategia de diseño FIFO funciona perfectamente en las tiendas, en las que los clientes toman solo los productos que se produjeron más tarde que los demás, pero si el pastel de frambuesa no se compró dentro de los tres días, el personal de la tienda se deshace de él.



No sabemos (condicionalmente) cuál será la demanda de pasteles en un día en particular en una tienda en particular, sin embargo, en nuestra simulación la configuramos de la siguiente manera para cada una de las tres tiendas: 3 ± 0.1, 1 ± 0.1, 2 ± 0.1.



Obviamente, la opción más rentable para nosotros es enviar tres toneladas a la primera tienda, una a la segunda y dos toneladas de tartas a la tercera, respectivamente.



Para resolver este problema, utilizamos un entorno de gimnasio personalizado, así como Deep Q Learning (implementación de Keras).



Entorno personalizado



Describiremos el estado del medio ambiente con tres números reales positivos: el resto de productos del día actual en cada una de las tres tiendas. Las acciones del agente son números del 0 al 5 inclusive, denotando los índices de permutación de los enteros 1, 2 y 3. Es claro que la acción más beneficiosa estará por debajo del 4º índice (3, 1, 2). Consideramos el problema como episódico, en un episodio de 30 días.



import gym
from gym import error, spaces, utils
from gym.utils import seeding

import itertools
import random
import time

class ShopsEnv(gym.Env):
  metadata = {'render.modes': ['human']}
  
  #  ,   
  #  
  def __init__(self):
    self.state = [0, 0, 0]  #  
    self.next_state = [0, 0, 0]  #  
    self.done = False  #   
    self.actions = list(itertools.permutations([1, 2, 3]))  #    
    self.reward = 0  #    
    self.time_tracker = 0  #   
    
    self.remembered_states = []  #     
    
    #   
    t = int( time.time() * 1000.0 )
    random.seed( ((t & 0xff000000) >> 24) +
                 ((t & 0x00ff0000) >>  8) +
                 ((t & 0x0000ff00) <<  8) +
                 ((t & 0x000000ff) << 24)   )
  
  #       ()  
  def step(self, action_num):
    #      
    if self.done:
        return [self.state, self.reward, self.done, self.next_state]
    else:
        #    
        self.state = self.next_state
        
        #  
        self.remembered_states.append(self.state) 
    
        #  
        self.time_tracker += 1
        
        #       
        action = self.actions[action_num]
        
        #  ,    ( )
        self.next_state = [x + y for x, y in zip(action, self.state)]
        
        #    
        self.next_state[0] -= (3 + random.uniform(-0.1, 0.1))
        self.next_state[1] -= (1 + random.uniform(-0.1, 0.1))
        self.next_state[2] -= (2 + random.uniform(-0.1, 0.1))
        
        #    
        if any([x < 0 for x in self.next_state]):
            self.reward = sum([x for x in self.next_state if x < 0])
        else:
            self.reward = 1
            
        #       
        #     
        #       (    ),
        #      
        if self.time_tracker >= 3:
            remembered_state = self.remembered_states.pop(0)
            self.next_state = [max(x - y, 0) for x, y in zip(self.next_state, remembered_state)]
        else:
            self.next_state = [max(x, 0) for x in self.next_state]
        
        
        #     30 
        self.done = self.time_tracker == 30
        
        #      
        return [self.state, self.reward, self.done, self.next_state]
  
  #   
  def reset(self):
    #      
    self.state = [0, 0, 0]
    self.next_state = [0, 0, 0]
    self.done = False
    self.reward = 0
    self.time_tracker = 0
    
    self.remembered_states = []
    
    t = int( time.time() * 1000.0 )
    random.seed( ((t & 0xff000000) >> 24) +
                 ((t & 0x00ff0000) >>  8) +
                 ((t & 0x0000ff00) <<  8) +
                 ((t & 0x000000ff) << 24)   )
    
    #   
    return self.state
  
  #     :
  #      
  def render(self, mode='human', close=False):
    print('-'*20)
    print('First shop')
    print('Pies:', self.state[0])

    print('Second shop')
    print('Pies:', self.state[1])

    print('Third shop')
    print('Pies:', self.state[2])
    print('-'*20)
    print('')


Importaciones importantes



import numpy as np #  
import pandas as pd #  
import gym #  
import gym_shops #    
from tqdm import tqdm #   

#  
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import clear_output
sns.set_color_codes()

#  
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import random #   


Definiendo un agente



class DQLAgent(): 
    
    def __init__(self, env):
        #    
       
        self.state_size = 3 #    
        self.action_size = 6 #    
        
        #    replay()
        self.gamma = 0.99
        self.learning_rate = 0.01
        
        #    adaptiveEGreedy()
        self.epsilon = 0.99
        self.epsilon_decay = 0.99
        self.epsilon_min = 0.0001
        
        self.memory = deque(maxlen = 5000) #   5000  ,    -   
        
        #   (NN)
        self.model = self.build_model()
    
    #      Deep Q Learning
    def build_model(self):
        model = Sequential()
        model.add(Dense(10, input_dim = self.state_size, activation = 'sigmoid')) #   
        model.add(Dense(50, activation = 'sigmoid')) #  
        model.add(Dense(10, activation = 'sigmoid')) #  
        model.add(Dense(self.action_size, activation = 'sigmoid')) #  
        model.compile(loss = 'mse', optimizer = Adam(lr = self.learning_rate))
        return model
    
    #    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    #   
    def act(self, state):
        #     0  1  epsilon
        #     (exploration)
        if random.uniform(0,1) <= self.epsilon:
            return random.choice(range(6))
        else:
            #          
            act_values = self.model.predict(state)
            return np.argmax(act_values[0])
            
    
    #     
    def replay(self, batch_size):
        
        #   ,        
        if len(self.memory) < batch_size:
            return
        
        minibatch = random.sample(self.memory, batch_size) #  batch_size    
        #     
        for state, action, reward, next_state, done in minibatch:
            if done: #    -      
                target = reward
            else:
                #       
                target = reward + self.gamma * np.amax(self.model.predict(next_state)[0]) 
                # target = R(s,a) + gamma * max Q`(s`,a`)
                # target (max Q` value)     ,   s`  
            train_target = self.model.predict(state) # s --> NN --> Q(s,a) = train_target
            train_target[0][action] = target
            self.model.fit(state, train_target, verbose = 0)
    
    #    exploration rate,
    #   epsilon
    def adaptiveEGreedy(self):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay




Entrena al agente



#  gym   
env = gym.make('shops-v0')
agent = DQLAgent(env)

#   
batch_size = 100
episodes = 1000

#  
progress_bar = tqdm(range(episodes), position=0, leave=True)
for e in progress_bar:
    #  
    state = env.reset()
    state = np.reshape(state, [1, 3])

    #    , id       
    time = 0
    taken_actions = []
    sum_rewards = 0


    #   
    while True:
        #  
        action = agent.act(state)

        #  
        taken_actions.append(action)

        #     
        next_state, reward, done, _ = env.step(action)
        next_state = np.reshape(next_state, [1, 3])

        #     
        sum_rewards += reward

        #   
        agent.remember(state, action, reward, next_state, done)

        #    
        state = next_state

        #  replay
        agent.replay(batch_size)

        #  epsilon
        agent.adaptiveEGreedy()

        #   
        time += 1

        #   
        progress_bar.set_postfix_str(s='mean reward: {}, time: {}, epsilon: {}'.format(round(sum_rewards/time, 3), time, round(agent.epsilon, 3)), refresh=True)

        #     
        if done:
            #       
            clear_output(wait=True)
            sns.distplot(taken_actions, color="y")
            plt.title('Episode: ' + str(e))
            plt.xlabel('Action number')
            plt.ylabel('Occurrence in %')
            plt.show()
            break






Probando el agente



import time
trained_model = agent  #     
state = env.reset()  #  
state = np.reshape(state, [1,3])

#        
time_t = 0
MAX_EPISOD_LENGTH = 1000  #   
taken_actions = []
mean_reward = 0

#   
progress_bar = tqdm(range(MAX_EPISOD_LENGTH), position=0, leave=True)
for time_t in progress_bar:
    #     
    action = trained_model.act(state)
    next_state, reward, done, _ = env.step(action)
    next_state = np.reshape(next_state, [1,3])
    state = next_state
    taken_actions.append(action)

    #   
    clear_output(wait=True)
    env.render()
    progress_bar.set_postfix_str(s='time: {}'.format(time_t), refresh=True)
    print('Reward:', round(env.reward, 3))
    time.sleep(0.5)

    mean_reward += env.reward

    if done:
        break

#    
sns.distplot(taken_actions, color='y')
plt.title('Test episode - mean reward: ' + str(round(mean_reward/(time_t+1), 3)))
plt.xlabel('Action number')
plt.ylabel('Occurrence in %')
plt.show()    






Total



Por lo tanto, el agente comprendió rápidamente cómo actuar de la manera más rentable.



En general, todavía hay mucho espacio para la experimentación: puede aumentar el número de tiendas, diversificar acciones o incluso cambiar los hiperparámetros del modelo de entrenamiento, y esto es solo el comienzo de la lista.



Fuentes






All Articles