Tareas de fondo de Faust Parte II: Agentes y equipos

Tabla de contenido

  1. Parte I: IntroducciΓ³n

  2. Parte II: Agentes y equipos

ΒΏQuΓ© estamos haciendo aquΓ­?

Entonces, entonces la segunda parte. Como se escribiΓ³ anteriormente, en Γ©l haremos lo siguiente:

  1. Escribamos un pequeΓ±o cliente para alphavantage en aiohttp con solicitudes para los puntos finales que necesitamos.

  2. Haremos un agente que recopilarΓ‘ datos sobre valores y metainformaciΓ³n sobre ellos.

, , , faust , , kafka, ( click), - , .

AlphaVantage

, aiohttp alphavantage.

alphavantage.py

Spoiler
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

:

  1. API AlphaVantage , construct_query http .

  2. snake_case .

  3. logger.catch .

P.S. alphavantage config.yml, HORTON_SERVICE_APIKEY. .

CRUD-

securities .

database/security.py

- , .

get_app()

app.py

Spoiler
import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

, , , , App-. settings , .

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

, faust- - . , ... , , internal -.

  1. kafka, , . , , :)

  2. internal, faust, , , , faust, : retention, retention policy (- delete, compact), - (partitions, , , faust).

  3. , , , . , (, - retention policy) .

    :

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

, , :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

, aiohttp . , , , - , ( , concurrency ).

, ( _, , , ) , (offset), , . , , (get_securities - active, . ) , , , , () .

!

> docker-compose up -d
...   ...
> faust -A horton.agents worker --without-web -l info

P.S. - faust , .

faust', ( ) info. :

Spoiler
β”ŒΖ’aΒ΅S† v1.10.4┬───────────────────────────────────────────────────┐
β”‚ id          β”‚ horton                                            β”‚
β”‚ transport   β”‚ [URL('kafka://localhost:9092')]                   β”‚
β”‚ store       β”‚ memory:                                           β”‚
β”‚ log         β”‚ -stderr- (info)                                   β”‚
β”‚ pid         β”‚ 1271262                                           β”‚
β”‚ hostname    β”‚ host-name                                         β”‚
β”‚ platform    β”‚ CPython 3.8.2 (Linux x86_64)                      β”‚
β”‚ drivers     β”‚                                                   β”‚
β”‚   transport β”‚ aiokafka=1.1.6                                    β”‚
β”‚   web       β”‚ aiohttp=3.6.2                                     β”‚
β”‚ datadir     β”‚ /path/to/project/horton-data                      β”‚
β”‚ appdir      β”‚ /path/to/project/horton-data/v1                   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
... , ,  ...

β”ŒTopic Partition Set─────────┬────────────┐
β”‚ topic                      β”‚ partitions β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ collect_securities         β”‚ {0-7}      β”‚
β”‚ horton-__assignor-__leader β”‚ {0}        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ 

!!!

partition set. , , , - (8, topic_partitions - ), ( partitions) . 8 , , .

, :

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

P.S. @ , "collect_securities".

, 6 - , kafdrop localhost:9000

, , loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

, mongo ( Robo3T Studio3T) , :

, , .

- :)

, !

, , 1/3 , , , .

, , :

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

security, (symbol) . faust Records - , .

records.py , :

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

, faust python, , - 3.6.

, :

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

, - value_type. , , - .

, - collect_securitites :

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

. , .cast, , , :

  1. cast - , . .

  2. send - , . .

  3. ask - . .

, !

, - . , faust - click. faust -A

agents.py app.command, cast collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

, , :

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

, , faust :

> faust -A horton.agents start-collect-securities

?

, , , sink cron- .

! :)

El cΓ³digo de esta parte

PD: En la ΓΊltima parte me preguntaron sobre fausto y kafka confluente ( que caracterΓ­sticas tiene confluente ). Confluent parece ser mucho mΓ‘s funcional, pero el punto es que faust no tiene soporte completo para el cliente para confluent; esto se desprende de la descripciΓ³n de las restricciones del cliente en el dock .




All Articles