¡Un taxi no tripulado conduce patos de goma amarillos por la ciudad! Módulo de verificación de problemas para la plataforma Gym-Duckietown

Para 2040, la mayoría de las ciudades importantes del mundo conducirán automóviles sin conductor, dicen los analistas . Pero para relajarnos en la carretera dentro de 20 años, debemos hacer un buen trabajo en los algoritmos de conducción autónoma. Para hacer esto, MIT desarrolló la plataforma Duckietown , que le permite hacer esto con un costo mínimo. En Duckietown, robots móviles de bajo costo transportan patos de goma amarillos en un modelo reducido de la ciudad. Sobre la base de esta plataforma, se realizan las Olimpiadas de conducción de IA y se lanzan cursos en universidades sobre la aplicación de tecnologías de inteligencia artificial en la gestión de vehículos no tripulados.



En este artículo hablaré sobre el proyecto de mi curso, en el que trabajé junto con el Laboratorio de Algoritmos de Robots Móviles Investigación de JetBrains : Acerca del Verificador de problemas que escribí para el emulador Gym-Duckietown . Hablaremos sobre el sistema de prueba y la integración de este sistema con plataformas educativas en línea que utilizan la tecnología External Grader, por ejemplo, con la plataforma Stepik.org .









Sobre el Autor



Mi nombre es Daniil Plushenko y soy estudiante de primer (segundo) año del programa de maestría " Programación y análisis de datos " en el HSE de San Petersburgo. En 2019, completé mi licenciatura en Matemáticas Aplicadas e Informática en la misma universidad.



Plataforma Duckietown



Duckietown es un proyecto de investigación de vehículos autónomos . Los organizadores del proyecto han creado una plataforma que ayuda a implementar un nuevo enfoque de aprendizaje en el campo de la inteligencia artificial y la robótica. Todo comenzó como un curso en el MIT en 2016, pero gradualmente se extendió por todo el mundo y en diferentes niveles de educación: desde la escuela secundaria hasta los programas de maestría.



La plataforma Duckietown tiene dos partes. Primero, es un modelo reducido de un entorno de transporte urbano con carreteras, edificios, señales de tráfico y obstáculos. En segundo lugar, es el transporte. Pequeños robots móviles (Duckiebots) que ejecutan una Raspberry Pi reciben información sobre el mundo que los rodea a través de una cámara y transportan a los residentes de la ciudad, patos de goma amarillos, por las carreteras.







He estado trabajando con el emulador Duckietown. Se llamaGym-Duckietown , y es un proyecto de código abierto escrito en Python. El emulador coloca tu bot dentro de la ciudad, cambia su posición según el algoritmo que estés usando (o en el botón que presionaste), vuelve a dibujar la imagen y escribe la posición actual del bot en los registros.  



Si está interesado en intentarlo, le recomiendo clonar el repositorio usted mismo y ejecutar manual_control.py : de esta manera, el bot puede controlarse mediante las flechas del teclado.



Captura de pantalla del emulador



El emulador se puede utilizar como entorno para ejecutar tareas. Establezcamos el siguiente problema: en un mapa determinado, que consta de un carril, debe conducir un metro en línea recta.







Para resolver el problema, puede utilizar el siguiente algoritmo:



for _ in range(25):
    env.step([1, 0])
    env.render()


Esta variable envalmacena el estado del medio ambiente.

El método de pasos toma como entrada action: una lista de dos elementos que describen la acción del bot. El primer elemento establece la velocidad, el segundo, el ángulo de rotación. El método rendervuelve a dibujar la imagen, teniendo en cuenta la nueva posición del bot. El número de pasos y la velocidad se eligieron empíricamente: estos son los valores necesarios para que el bot viaje exactamente un metro en línea recta.



Si desea utilizar este fragmento en manual_control.py, péguelo aquí . El código hasta este punto está ocupado cargando el entorno. Para simplificar, puede reutilizarlo y luego agregar la solución anterior al problema.



Sistema de prueba



Me gustaría poder verificar tales tareas automáticamente: tomar la implementación del algoritmo de control del bot, simular el viaje e informar si el algoritmo propuesto realiza correctamente la tarea. Un sistema de pruebas de este tipo permitiría utilizar el entorno durante la preparación de las competiciones de control autónomo del transporte, así como con fines educativos: emitir un conjunto de problemas a los estudiantes y comprobar sus soluciones de forma automática. Estuve involucrado en su desarrollo mientras trabajaba en un proyecto de curso y a continuación les contaré lo que obtuve.



La secuencia de pasos al verificar la solución se ve así:







Puede agregar tareas al sistema de prueba usted mismo o consultar las que hice yo. Cada tarea tiene un generador de condiciones, una clase que genera un estado de entorno. Contiene el nombre del mapa y la posición inicial del bot dentro de la celda inicial. También se establecen las coordenadas del objetivo: en este caso, es un punto a un metro de la posición inicial.



class Ride1MTaskGenerator(TaskGenerator):
    def __init__(self, args):
        super().__init__(args)

    def generate_task(self):
        env_loader = CVTaskEnv if self.args.is_cv_task else TrackingDuckietownEnv
        env = env_loader(
            map_name="straight_road",
            position_on_initial_road_tile=PositionOnInitialRoadTile(
                x_coefficient=0.5,
                z_coefficient=0.5,
                angle=0,
            ),
        )
        self.generated_task['target_coordinates'] = [[env.road_tile_size * 0.5 + 1, 0, env.road_tile_size * 0.5]]
        self.generated_task['env'] = env
        env.render()
        return self.generated_task


Aquí TrackingDuckietownEnvy CVTaskEnvhay clases contenedoras que se utilizan para almacenar información sobre el viaje para un análisis más detallado. 



class TrackingDuckietownEnv:
    def __init__(self, **kwargs):
        self.__wrapped = DuckietownEnv(**kwargs)
    def step(self, action):
        obs, reward, done, misc = self.__wrapped.step(action)
        message = misc['Simulator']['msg']
        if 'invalid pose' in message.lower():
            raise InvalidPoseException(message)
        for t in self.trackers:
            t.track(self)
        return obs, reward, done, misc


Los rastreadores recopilan información sobre el estado actual, como la posición del bot.



CVTaskEnvse usa si se requiere una solución usando solo información de la cámara ("visión por computadora"), y no las funciones del emulador: por ejemplo, si necesita saber qué tan lejos está el bot del centro de la tira o dónde está el objeto visible más cercano. CVTaskEnvLlamar a las funciones del emulador puede simplificar la solución del problema y la clase limita la invocación de los métodos del emulador. Se utiliza cuando se muestra la bandera is_cv_task



class CVTaskEnv:
    def __init__(self, **kwargs):
        self.__wrapped = TrackingDuckietownEnv(**kwargs)

    def __getattr__(self, item):
        if item in self.__wrapped.overriden_methods:
            return self.__wrapped.__getattribute__(item)
        ALLOWED_FOR_CV_TASKS = [
            'render', '_CVTaskEnv__wrapped', '_TrackingDuckietownEnv__wrapped',
            'road_tile_size', 'trip_statistics'
        ]
        if item in ALLOWED_FOR_CV_TASKS:
            return self.__wrapped.__getattr__(item)
        else:
            raise AttributeError(item + " call is not allowed in CV tasks")


Una vez que se completa la ejecución de la decisión, siempre que no haya sido interrumpida por un tiempo de espera, la información del viaje se pasa a través de una secuencia de comprobadores. Si todas las fichas han funcionado correctamente, el problema se considera resuelto correctamente. De lo contrario, se muestra un veredicto explicativo, por ejemplo, un robot se estrelló, se salió de la carretera, etc.



Aquí está una de las fichas estándar. Comprueba que el bot ha vuelto al punto de partida al final del viaje. Esto puede ser útil si, por ejemplo, en una tarea necesita llegar a cierto punto y luego regresar.



class SameInitialAndFinalCoordinatesChecker(Checker):
    def __init__(self, maximum_deviation=0.1, **kwargs):
        super().__init__(**kwargs)
        self.maximum_deviation = maximum_deviation

    def check(self, generated_task, trackers, **kwargs):
        trip_statistics = next(x for x in trackers if isinstance(x, TripStatistics))
        trip_data = trip_statistics.trip_data
        if len(trip_data) == 0:
            return True
        initial_coordinates = trip_data[0].position.coordinates
        final_coordinates = trip_data[-1].position.coordinates
        return np.linalg.norm(initial_coordinates - final_coordinates) < self.maximum_deviation


Dicho sistema de prueba se puede usar en modo manual, es decir, iniciar manualmente la prueba y luego estudiar visualmente el veredicto. Si quisiéramos, por ejemplo, lanzar un curso en línea sobre transporte autónomo en Stepik.org , necesitaríamos la integración con la plataforma. Esto se discutirá en la siguiente parte del artículo.



Integración con plataformas online



Para las tareas de prueba, a menudo se utiliza la tecnología External Grader, que fue desarrollada por la plataforma edX .



Cuando se usa el Calificador externo, la plataforma educativa no verifica las tareas por sí sola, sino que genera una cola de paquetes que se envían a otro dispositivo. La funcionalidad de conexión de cola se implementa en el proyecto xqueue-watcher . El observador de Xqueue recupera los paquetes y luego el validador los prueba (que generalmente hace más cosas no triviales que las comparaciones de texto / número). Después de eso, el veredicto de verificación se envía de vuelta al lado de la plataforma educativa.



Consideremos con más detalle el momento de conectarse a la cola. Después de que la plataforma educativa proporcione los datos de conexión, deberán agregarse aarchivos de configuración y, en el método de calificación , implementar el lanzamiento de verificación directamente. Puede encontrar instrucciones más detalladas aquí y aquí .



El observador de Xqueue llama al punto final get_submission , que recuperará el paquete de la cola si es posible. Después de eso, se va a hacer la prueba. El xqueue-watcher luego llama a put_result para devolver el veredicto.



Puede iniciar xqueue-watcher así:



make requirements && python -m xqueue_watcher -d conf.d/


Digamos que queremos utilizar la tecnología External Grader, pero no queremos ejecutar el curso en una plataforma en línea. Xqueue-watcher se implementa asumiendo que hay algún almacenamiento de archivos donde se cargan archivos con soluciones (las plataformas tienen dicho almacenamiento). Podemos modificar Xqueue para que el almacenamiento de archivos ya no sea necesario, y este tipo de sistemas se puedan ejecutar, en general, incluso en nuestro portátil.



Primero debe aprender a mantener la cola de paquetes. La funcionalidad de cola la proporciona el proyecto xqueue .





Imagen tomada de documentación .



Puedes ejecutarlo así:



apt-get install libaio1 libaio-dev
apt-get install libmysqlclient-dev
pip3 install -r requirements.txt
python3 manage.py migrate
python3 manage.py runserver $xqueue_address


Es posible que deba crear un archivo ~ / edx / edx.log



De forma predeterminada, xqueue proporciona a xqueue-watcher no el contenido de los paquetes, es decir, archivos con la solución al problema, sino enlaces a estos archivos en el almacenamiento de archivos. Para ser independiente del almacenamiento de archivos, puede hacer que los archivos se envíen y almacenarlos en la misma máquina desde la que se ejecuta xqueue-watcher. 



Así es como se debe modificar el código fuente para lograr esto:



La implementación del método _upload en lms_interface.py se reemplaza con este:



def _upload(file_to_upload, path, name):
    '''
    Upload file using the provided keyname.
    Returns:
        URL to access uploaded file
    '''
    full_path = os.path.join(path, name)
    return default_storage.save(full_path, file_to_upload)


Si no se conectó ningún almacenamiento de archivos, este método guardará el archivo con la solución en la ruta $ queue_name / $ file_to_upload_hash.



En la implementación de get_sumbission en el archivo ext_interface.py, en lugar de esta línea, escriba:



xqueue_files = json.loads(submission.urls)
for xqueue_file in xqueue_files.keys():
    with open(xqueue_files[xqueue_file], 'r') as f:
        xqueue_files[xqueue_file] = f.readlines()


No transfiramos enlaces (rutas) a archivos, sino su contenido.



Cada solución se ejecuta en un contenedor docker "único" con recursos limitados, es decir, se crea un contenedor separado para la ejecución de cada solución, que se elimina una vez finalizada la prueba. Para crear dichos contenedores y ejecutar comandos en ellos, se usa portainer-api (de hecho, como un contenedor sobre la API de Docker).



Salir



En este artículo hablé sobre cómo se crearon el sistema de pruebas y las tareas para el transporte autónomo, así como la integración de este sistema con plataformas educativas online que utilizan la tecnología External Grader. Espero que pronto se lance un curso que utilice este sistema y que la parte sobre integración con plataformas en línea sea útil para quienes deseen crear su propio curso en línea o fuera de línea.



All Articles