aio api crawler

Hola. Comencé a trabajar en una biblioteca para extraer datos de diferentes api json. También se puede utilizar para probar api.



Las API se describen como clases, por ejemplo



class Categories(JsonEndpoint):
    url = "http://127.0.0.1:8888/categories"
    params = {"page": range(100), "language": "en"}
    headers = {"User-Agent": get_user_agent}
    results_key = "*.slug"

categories = Categories()


class Posts(JsonEndpoint):
    url = "http://127.0.0.1:8888/categories/{category}/posts"
    params = {"page": range(100), "language": "en"}
    url_params = {"category": categories.iter_results()}
    results_key = "posts"

    async def comments(self, post):
        comments = Comments(
            self.session,
            url_params={"category": post.url.params["category"], "id": post["id"]},
        )
        return [comment async for comment in comments]

posts = Posts()


Params y url_params pueden contener funciones (como aquí get_user_agent - devuelve un useragent aleatorio), rango, iteradores, iteradores en espera y asincrónicos (para que pueda vincularlos).



Los encabezados de los parámetros y las cookies también pueden contener funciones y esperables.



La api de categoría en el ejemplo anterior devuelve una matriz de objetos que tienen un slug, el iterador devolverá exactamente esos. Al deslizar este iterador en los url_params de las publicaciones, el iterador recorrerá de forma recursiva todas las categorías y todas las páginas de cada una. Se abortará cuando encuentre un error 404 o algún otro error y pase a la siguiente categoría.



Y los repositorios tienen un ejemplo de servidor aiohttp para estas clases para que se pueda probar todo.



Además de los parámetros de obtención, puede pasarlos como datos o json y establecer otro método.



results_key tiene puntos e intentará extraer claves de los resultados. Por ejemplo, "comentarios. *. Texto" devolverá el texto de cada comentario de la matriz dentro de los comentarios.



Los resultados se envuelven en un contenedor que tiene propiedades de URL y parámetros. url se deriva de una cadena que también tiene params. De esta forma, puede averiguar qué parámetros se utilizaron para obtener este resultado, como se demuestra en el método de comentarios.



También hay una clase Sink base para manejar los resultados. Por ejemplo, doblándolos en mq o en una base de datos. Funciona en tareas independientes y recibe datos a través de asyncio.Queue.



class LoggingSink(Sink):
    def transform(self, obj):
        return repr(obj)

    async def init(self):
        from loguru import logger

        self.logger = logger

    async def process(self, obj):
        self.logger.info(obj)
        return True

sink = LoggingSink(num_tasks=1)


Un ejemplo del fregadero más simple. El método de transformación nos permite realizar algunas manipulaciones con el objeto y devolver None si no nos conviene. aquellos. en temas también puedes hacer validación.



Sink es un administrador de contexto asincrónico que, cuando se sale, en teoría esperará hasta que se procesen todos los objetos de la cola y luego cancelará sus tareas.



Y finalmente, para unirlo todo, hice una clase de trabajador. Acepta un punto final y varios sumideros. Por ejemplo,



worker = Worker(endpoint=posts, sinks=[loggingsink, mongosink])
worker.run()


run ejecutará asyncio.run_until_complete para la canalización del trabajador. También tiene un método de transformación.



También hay una clase WorkerGroup que le permite crear varios trabajadores a la vez y hacer un asyncio.gather para ellos.



El código contiene un ejemplo de un servidor que genera datos a través de falsificadores y controladores para sus puntos finales. Creo que esto es lo más obvio.



Todo esto se encuentra en una etapa temprana de desarrollo y hasta ahora he cambiado a menudo api. Pero ahora parece haber llegado a cómo debería verse. Solo fusionaré solicitudes y comentarios en mi código.



All Articles