Escribir un servicio web en Python usando FastAPI

imagen



Lo sé, lo sé, probablemente estés pensando "¿qué, otra vez?"



Sí, en Habré ya han escrito sobre el marco FastAPI muchas veces . Pero propongo considerar esta herramienta con un poco más de detalle y escribir la API de tu propio mini Habr sin karma y ratings, pero con blackjack y con pruebas, autenticación, migraciones y trabajo asincrónico con la base de datos.

Esquema de base de datos y migraciones



En primer lugar, utilizando SQLAlchemy Expression Language , describiremos el esquema de la base de datos. Creemos un archivo models / users.py :



import sqlalchemy
from sqlalchemy.dialects.postgresql import UUID

metadata = sqlalchemy.MetaData()


users_table = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("email", sqlalchemy.String(40), unique=True, index=True),
    sqlalchemy.Column("name", sqlalchemy.String(100)),
    sqlalchemy.Column("hashed_password", sqlalchemy.String()),
    sqlalchemy.Column(
        "is_active",
        sqlalchemy.Boolean(),
        server_default=sqlalchemy.sql.expression.true(),
        nullable=False,
    ),
)


tokens_table = sqlalchemy.Table(
    "tokens",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column(
        "token",
        UUID(as_uuid=False),
        server_default=sqlalchemy.text("uuid_generate_v4()"),
        unique=True,
        nullable=False,
        index=True,
    ),
    sqlalchemy.Column("expires", sqlalchemy.DateTime()),
    sqlalchemy.Column("user_id", sqlalchemy.ForeignKey("users.id")),
)


Y el archivo models / posts.py :



import sqlalchemy

from .users import users_table

metadata = sqlalchemy.MetaData()


posts_table = sqlalchemy.Table(
    "posts",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("user_id", sqlalchemy.ForeignKey(users_table.c.id)),
    sqlalchemy.Column("created_at", sqlalchemy.DateTime()),
    sqlalchemy.Column("title", sqlalchemy.String(100)),
    sqlalchemy.Column("content", sqlalchemy.Text()),
)


Para automatizar las migraciones de la base de datos, instale alambic :



$ pip install alembic


Para inicializar Alembic, ejecute:



$ alembic init migrations


Este comando creará en el directorio actual el archivo alembic.ini y un directorio de migraciones que contiene:



  • directorio de versiones donde se almacenarán los archivos de migración
  • script env.py que se ejecuta cuando se llama alambic
  • un archivo script.py.mako que contiene la plantilla para nuevas migraciones.


Indicaremos la url de nuestra base de datos, para ello, en el archivo alembic.ini, agregue la línea:



sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASS)s@%(DB_HOST)s:5432/%(DB_NAME)s


El formato % (variable_name) s nos permite establecer diferentes valores de variables según el entorno, anulándolos en el archivo env.py de esta manera:



from os import environ
from alembic import context
from app.models import posts, users

# Alembic Config   
#     alembic.ini
config = context.config

section = config.config_ini_section
config.set_section_option(section, "DB_USER", environ.get("DB_USER"))
config.set_section_option(section, "DB_PASS", environ.get("DB_PASS"))
config.set_section_option(section, "DB_NAME", environ.get("DB_NAME"))
config.set_section_option(section, "DB_HOST", environ.get("DB_HOST"))

fileConfig(config.config_file_name)

target_metadata = [users.metadata, posts.metadata]


Aquí tomamos los valores de DB_USER, DB_PASS, DB_NAME y DB_HOST de las variables de entorno. Además, el archivo env.py especifica los metadatos de nuestra base de datos en el atributo target_metadata , sin el cual Alembic no podrá determinar qué cambios deben realizarse en la base de datos.



Todo está listo y podemos generar migraciones y actualizar la base de datos:




$ alembic revision --autogenerate -m "Added required tables"
$ alembic upgrade head


Lanzamos la aplicación y conectamos la base de datos



Creemos un archivo main.py :



from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
    return {"Hello": "World"}


E inicie la aplicación ejecutando el comando:



$ uvicorn main:app --reload


Asegurémonos de que todo funcione como debería. Abra http://127.0.0.1:8000/ en el navegador y vea
{"Hello": "World"}


Para conectarnos a la base de datos utilizaremos el módulo de bases de datos , que nos permite ejecutar consultas de forma asincrónica.



Vamos a configurar el inicio y shutdhown acontecimientos de nuestro servicio, en la que se producirá la conexión y desconexión de la base de datos. Editemos el archivo main.py :



from os import environ

import databases

#      
DB_USER = environ.get("DB_USER", "user")
DB_PASSWORD = environ.get("DB_PASSWORD", "password")
DB_HOST = environ.get("DB_HOST", "localhost")
DB_NAME = "async-blogs"
SQLALCHEMY_DATABASE_URL = (
    f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
)
#   database,      
database = databases.Database(SQLALCHEMY_DATABASE_URL)


app = FastAPI()


@app.on_event("startup")
async def startup():
    #       
    await database.connect()


@app.on_event("shutdown")
async def shutdown():
    #       
    await database.disconnect()


@app.get("/")
async def read_root():
    #    ,      
    query = (
        select(
            [
                posts_table.c.id,
                posts_table.c.created_at,
                posts_table.c.title,
                posts_table.c.content,
                posts_table.c.user_id,
                users_table.c.name.label("user_name"),
            ]
        )
        .select_from(posts_table.join(users_table))
        .order_by(desc(posts_table.c.created_at))
    )
    return await database.fetch_all(query)


Abrimos http://127.0.0.1:8000/ y si vemos una lista vacía [] en la respuesta , entonces todo salió bien y podemos seguir adelante.



Validación de solicitudes y respuestas



Implementaremos la posibilidad de registro de usuario. Para hacer esto, necesitamos validar las solicitudes y respuestas HTTP. Para resolver este problema, usaremos la biblioteca pydantic :



pip install pydantic


Cree un archivo schemas / users.py y agregue un modelo que sea responsable de validar el cuerpo de la solicitud:



from pydantic import BaseModel, EmailStr

class UserCreate(BaseModel):
    """  sign-up  """
    email: EmailStr
    name: str
    password: str


Tenga en cuenta que los tipos de campo se definen mediante la anotación de tipo. Además de los tipos de datos integrados como int y str , pydantic ofrece una gran cantidad de tipos que proporcionan validación adicional. Por ejemplo, el tipo EmailStr verifica que el valor recibido sea un correo electrónico válido. Para utilizar el tipo EmailStr , debe instalar el módulo validador de correo electrónico :



pip install email-validator


El cuerpo de la respuesta debe contener sus propios campos específicos, por ejemplo, id y access_token , así que agreguemos modelos responsables de generar la respuesta al archivo schemas / users.py :



from typing import Optional
from pydantic import UUID4, BaseModel, EmailStr, Field, validator


class UserCreate(BaseModel):
    """  sign-up  """
    email: EmailStr
    name: str
    password: str


class UserBase(BaseModel):
    """       """
    id: int
    email: EmailStr
    name: str


class TokenBase(BaseModel):
    token: UUID4 = Field(..., alias="access_token")
    expires: datetime
    token_type: Optional[str] = "bearer"

    class Config:
        allow_population_by_field_name = True

    @validator("token")
    def hexlify_token(cls, value):
        """  UUID  hex  """
        return value.hex


class User(UserBase):
    """         """
    token: TokenBase = {}


Para cada campo del modelo, puede escribir un validador personalizado . Por ejemplo, hexlify_token convierte el valor UUID en una cadena hexadecimal. Vale la pena señalar que puede usar la clase Field cuando necesite anular el comportamiento predeterminado de un campo de modelo. Por ejemplo, token: UUID4 = Field (..., alias = "access_token") establece el alias de access_token para el campo del token . Para indicar que el campo es obligatorio, se pasa un valor especial - ... ( puntos suspensivos ) como primer parámetro .



Agregue el archivo utils / users.py , en el que crearemos los métodos necesarios para escribir un usuario en la base de datos:



import hashlib
import random
import string
from datetime import datetime, timedelta
from sqlalchemy import and_

from app.models.database import database
from app.models.users import tokens_table, users_table
from app.schemas import users as user_schema

def get_random_string(length=12):
    """   ,    """
    return "".join(random.choice(string.ascii_letters) for _ in range(length))


def hash_password(password: str, salt: str = None):
    """     """
    if salt is None:
        salt = get_random_string()
    enc = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000)
    return enc.hex()


def validate_password(password: str, hashed_password: str):
    """ ,         """
    salt, hashed = hashed_password.split("$")
    return hash_password(password, salt) == hashed


async def get_user_by_email(email: str):
    """     """
    query = users_table.select().where(users_table.c.email == email)
    return await database.fetch_one(query)


async def get_user_by_token(token: str):
    """       """
    query = tokens_table.join(users_table).select().where(
        and_(
            tokens_table.c.token == token,
            tokens_table.c.expires > datetime.now()
        )
    )
    return await database.fetch_one(query)


async def create_user_token(user_id: int):
    """       user_id """
    query = (
        tokens_table.insert()
        .values(expires=datetime.now() + timedelta(weeks=2), user_id=user_id)
        .returning(tokens_table.c.token, tokens_table.c.expires)
    )

    return await database.fetch_one(query)


async def create_user(user: user_schema.UserCreate):
    """      """
    salt = get_random_string()
    hashed_password = hash_password(user.password, salt)
    query = users_table.insert().values(
        email=user.email, name=user.name, hashed_password=f"{salt}${hashed_password}"
    )
    user_id = await database.execute(query)
    token = await create_user_token(user_id)
    token_dict = {"token": token["token"], "expires": token["expires"]}

    return {**user.dict(), "id": user_id, "is_active": True, "token": token_dict}




Crear un archivo de routers / users.py y añadir una inscripción ruta, que especifica que en la solicitud que espera que la CreateUser modelo y devuelve el usuario modelo :

from fastapi import APIRouter
from app.schemas import users
from app.utils import users as users_utils


router = APIRouter()


@router.post("/sign-up", response_model=users.User)
async def create_user(user: users.UserCreate):
    db_user = await users_utils.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return await users_utils.create_user(user=user)


Solo queda conectar las rutas desde el archivo routers / users.py . Para hacer esto, agregue las siguientes líneas a main.py :



from app.routers import users
app.include_router(users.router)


Autenticación y control de acceso



Ahora que tenemos usuarios en nuestra base de datos, estamos listos para configurar la autenticación de la aplicación. Agreguemos un punto final que toma un nombre de usuario y una contraseña y devuelve un token. Actualicemos el archivo routers / users.py agregando:



from fastapi import Depends
from fastapi.security import OAuth2PasswordRequestForm


@router.post("/auth", response_model=users.TokenBase)
async def auth(form_data: OAuth2PasswordRequestForm = Depends()):
    user = await users_utils.get_user_by_email(email=form_data.username)

    if not user:
        raise HTTPException(status_code=400, detail="Incorrect email or password")

    if not users_utils.validate_password(
        password=form_data.password, hashed_password=user["hashed_password"]
    ):
        raise HTTPException(status_code=400, detail="Incorrect email or password")

    return await users_utils.create_user_token(user_id=user["id"])


Al mismo tiempo, no necesitamos describir el modelo de solicitud nosotros mismos, Fastapi proporciona una clase de dependencia especial OAuth2PasswordRequestForm , que hace que la ruta espere dos campos de nombre de usuario y contraseña.



Para restringir el acceso a ciertas rutas para usuarios no autenticados, escribiremos un método de dependencia. Verificará que el token proporcionado pertenece al usuario activo y devolverá los detalles del usuario. Esto nos permitirá utilizar la información del usuario en todas las rutas que requieran autenticación. Creemos un archivo utils / dependecies.py :



from app.utils import users as users_utils
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth")


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = await users_utils.get_user_by_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    if not user["is_active"]:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
        )
    return user


Tenga en cuenta que una dependencia puede depender a su vez de otra dependencia. Por ejemplo, OAuth2PasswordBearer es una dependencia que deja claro a FastAPI que la ruta actual requiere autenticación.



Para comprobar que todo funciona como se esperaba, agregue la ruta / users / me , que devuelve los detalles del usuario actual. Agregue las líneas a routers / users.py :



from app.utils.dependencies import get_current_user


@router.get("/users/me", response_model=users.UserBase)
async def read_users_me(current_user: users.User = Depends(get_current_user)):
    return current_user


Ahora tenemos la ruta / users / me , a la que solo los usuarios autenticados tienen acceso.



Todo está listo para finalmente agregar la capacidad de que los usuarios creen y editen publicaciones:



utils / posts.py
from datetime import datetime

from app.models.database import database
from app.models.posts import posts_table
from app.models.users import users_table
from app.schemas import posts as post_schema
from sqlalchemy import desc, func, select


async def create_post(post: post_schema.PostModel, user):
    query = (
        posts_table.insert()
        .values(
            title=post.title,
            content=post.content,
            created_at=datetime.now(),
            user_id=user["id"],
        )
        .returning(
            posts_table.c.id,
            posts_table.c.title,
            posts_table.c.content,
            posts_table.c.created_at,
        )
    )
    post = await database.fetch_one(query)

    # Convert to dict and add user_name key to it
    post = dict(zip(post, post.values()))
    post["user_name"] = user["name"]
    return post


async def get_post(post_id: int):
    query = (
        select(
            [
                posts_table.c.id,
                posts_table.c.created_at,
                posts_table.c.title,
                posts_table.c.content,
                posts_table.c.user_id,
                users_table.c.name.label("user_name"),
            ]
        )
        .select_from(posts_table.join(users_table))
        .where(posts_table.c.id == post_id)
    )
    return await database.fetch_one(query)


async def get_posts(page: int):
    max_per_page = 10
    offset1 = (page - 1) * max_per_page
    query = (
        select(
            [
                posts_table.c.id,
                posts_table.c.created_at,
                posts_table.c.title,
                posts_table.c.content,
                posts_table.c.user_id,
                users_table.c.name.label("user_name"),
            ]
        )
        .select_from(posts_table.join(users_table))
        .order_by(desc(posts_table.c.created_at))
        .limit(max_per_page)
        .offset(offset1)
    )
    return await database.fetch_all(query)


async def get_posts_count():
    query = select([func.count()]).select_from(posts_table)
    return await database.fetch_val(query)


async def update_post(post_id: int, post: post_schema.PostModel):
    query = (
        posts_table.update()
        .where(posts_table.c.id == post_id)
        .values(title=post.title, content=post.content)
    )
    return await database.execute(query)





routers / posts.py
from app.schemas.posts import PostDetailsModel, PostModel
from app.schemas.users import User
from app.utils import posts as post_utils
from app.utils.dependencies import get_current_user
from fastapi import APIRouter, Depends, HTTPException, status

router = APIRouter()


@router.post("/posts", response_model=PostDetailsModel, status_code=201)
async def create_post(post: PostModel, current_user: User = Depends(get_current_user)):
    post = await post_utils.create_post(post, current_user)
    return post


@router.get("/posts")
async def get_posts(page: int = 1):
    total_cout = await post_utils.get_posts_count()
    posts = await post_utils.get_posts(page)
    return {"total_count": total_cout, "results": posts}


@router.get("/posts/{post_id}", response_model=PostDetailsModel)
async def get_post(post_id: int):
    return await post_utils.get_post(post_id)


@router.put("/posts/{post_id}", response_model=PostDetailsModel)
async def update_post(
    post_id: int, post_data: PostModel, current_user=Depends(get_current_user)
):
    post = await post_utils.get_post(post_id)
    if post["user_id"] != current_user["id"]:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="You don't have access to modify this post",
        )

    await post_utils.update_post(post_id=post_id, post=post_data)
    return await post_utils.get_post(post_id)





Conectemos nuevas rutas agregando a main.py

from app.routers import posts
app.include_router(posts.router)


Pruebas



Escribiremos pruebas en pytest :



$ pip install pytest


Para probar puntos finales, FastAPI proporciona una herramienta especial TestClient .



Escribamos una prueba de punto final que no requiera una conexión a la base de datos:



from app.main import app
from fastapi.testclient import TestClient

client = TestClient(app)


def test_health_check():
    response = client.get("/")
    assert response.status_code == 200
    assert response.json() == {"Hello": "World"}


Como ves, todo es bastante sencillo. Debe inicializar TestClient y usarlo para probar las solicitudes HTTP.



Para probar el resto de los puntos finales, debe crear una base de datos de prueba. Editemos el archivo main.py , agregando la configuración base de prueba:



from os import environ

import databases

DB_USER = environ.get("DB_USER", "user")
DB_PASSWORD = environ.get("DB_PASSWORD", "password")
DB_HOST = environ.get("DB_HOST", "localhost")

TESTING = environ.get("TESTING")

if TESTING:
    #      
    DB_NAME = "async-blogs-temp-for-test"
    TEST_SQLALCHEMY_DATABASE_URL = (
        f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
    )
    database = databases.Database(TEST_SQLALCHEMY_DATABASE_URL)
else:
    DB_NAME = "async-blogs"
    SQLALCHEMY_DATABASE_URL = (
        f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
    )
    database = databases.Database(SQLALCHEMY_DATABASE_URL)


Todavía estamos usando la base de datos "async-blogs" para nuestra aplicación. Pero si se establece la variable de entorno TESTING, se utiliza la base de datos "async-blogs-temp-for-test" .



Para hacer que la base de datos "async-blogs-temp-for-test" se cree automáticamente cuando las pruebas se ejecutan y se eliminan después de su ejecución, cree un accesorio en el archivo tests / conftest.py :



import os

import pytest

#  `os.environ`,    
os.environ['TESTING'] = 'True'

from alembic import command
from alembic.config import Config
from app.models import database
from sqlalchemy_utils import create_database, drop_database


@pytest.fixture(scope="module")
def temp_db():
    create_database(database.TEST_SQLALCHEMY_DATABASE_URL) #  
    base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
    alembic_cfg = Config(os.path.join(base_dir, "alembic.ini")) #   alembic 
    command.upgrade(alembic_cfg, "head") #  

    try:
        yield database.TEST_SQLALCHEMY_DATABASE_URL
    finally:
        drop_database(database.TEST_SQLALCHEMY_DATABASE_URL) #  


Para crear y eliminar la base de datos, usaremos la biblioteca sqlalchemy_utils .



Al usar el accesorio temp_db en las pruebas, podemos probar todos los puntos finales de nuestra aplicación:



def test_sign_up(temp_db):
    request_data = {
        "email": "vader@deathstar.com",
        "name": "Darth Vader",
        "password": "rainbow"
    }
    with TestClient(app) as client:
        response = client.post("/sign-up", json=request_data)
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["email"] == "vader@deathstar.com"
    assert response.json()["name"] == "Darth"
    assert response.json()["token"]["expires"] is not None
    assert response.json()["token"]["access_token"] is not None


tests / test_posts.py
import asyncio

from app.main import app
from app.schemas.users import UserCreate
from app.utils.users import create_user, create_user_token
from fastapi.testclient import TestClient


def test_create_post(temp_db):
    user = UserCreate(
        email="vader@deathstar.com",
        name="Darth",
        password="rainbow"
    )
    request_data = {
      "title": "42",
      "content": "Don't panic!"
    }
    with TestClient(app) as client:
        # Create user and use his token to add new post
        loop = asyncio.get_event_loop()
        user_db = loop.run_until_complete(create_user(user))
        response = client.post(
            "/posts",
            json=request_data,
            headers={"Authorization": f"Bearer {user_db['token']['token']}"}
        )
    assert response.status_code == 201
    assert response.json()["id"] == 1
    assert response.json()["title"] == "42"
    assert response.json()["content"] == "Don't panic!"


def test_create_post_forbidden_without_token(temp_db):
    request_data = {
      "title": "42",
      "content": "Don't panic!"
    }
    with TestClient(app) as client:
        response = client.post("/posts", json=request_data)
    assert response.status_code == 401


def test_posts_list(temp_db):
    with TestClient(app) as client:
        response = client.get("/posts")
    assert response.status_code == 200
    assert response.json()["total_count"] == 1
    assert response.json()["results"][0]["id"] == 1
    assert response.json()["results"][0]["title"] == "42"
    assert response.json()["results"][0]["content"] == "Don't panic!"


def test_post_detail(temp_db):
    post_id = 1
    with TestClient(app) as client:
        response = client.get(f"/posts/{post_id}")
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["title"] == "42"
    assert response.json()["content"] == "Don't panic!"


def test_update_post(temp_db):
    post_id = 1
    request_data = {
      "title": "42",
      "content": "Life? Don't talk to me about life."
    }
    with TestClient(app) as client:
        # Create user token to add new post
        loop = asyncio.get_event_loop()
        token = loop.run_until_complete(create_user_token(user_id=1))
        response = client.put(
            f"/posts/{post_id}",
            json=request_data,
            headers={"Authorization": f"Bearer {token['token']}"}
        )
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["title"] == "42"
    assert response.json()["content"] == "Life? Don't talk to me about life."


def test_update_post_forbidden_without_token(temp_db):
    post_id = 1
    request_data = {
      "title": "42",
      "content": "Life? Don't talk to me about life."
    }
    with TestClient(app) as client:
        response = client.put(f"/posts/{post_id}", json=request_data)
    assert response.status_code == 401




tests / test_users.py
import asyncio
import pytest

from app.main import app
from app.schemas.users import UserCreate
from app.utils.users import create_user, create_user_token
from fastapi.testclient import TestClient


def test_sign_up(temp_db):
    request_data = {
        "email": "vader@deathstar.com",
        "name": "Darth",
        "password": "rainbow"
    }
    with TestClient(app) as client:
        response = client.post("/sign-up", json=request_data)
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["email"] == "vader@deathstar.com"
    assert response.json()["name"] == "Darth"
    assert response.json()["token"]["expires"] is not None
    assert response.json()["token"]["token"] is not None


def test_login(temp_db):
    request_data = {"username": "vader@deathstar.com", "password": "rainbow"}
    with TestClient(app) as client:
        response = client.post("/auth", data=request_data)
    assert response.status_code == 200
    assert response.json()["token_type"] == "bearer"
    assert response.json()["expires"] is not None
    assert response.json()["access_token"] is not None


def test_login_with_invalid_password(temp_db):
    request_data = {"username": "vader@deathstar.com", "password": "unicorn"}
    with TestClient(app) as client:
        response = client.post("/auth", data=request_data)
    assert response.status_code == 400
    assert response.json()["detail"] == "Incorrect email or password"


def test_user_detail(temp_db):
    with TestClient(app) as client:
        # Create user token to see user info
        loop = asyncio.get_event_loop()
        token = loop.run_until_complete(create_user_token(user_id=1))
        response = client.get(
            "/users/me",
            headers={"Authorization": f"Bearer {token['token']}"}
        )
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["email"] == "vader@deathstar.com"
    assert response.json()["name"] == "Darth"


def test_user_detail_forbidden_without_token(temp_db):
    with TestClient(app) as client:
        response = client.get("/users/me")
    assert response.status_code == 401


@pytest.mark.freeze_time("2015-10-21")
def test_user_detail_forbidden_with_expired_token(temp_db, freezer):
    user = UserCreate(
        email="sidious@deathstar.com",
        name="Palpatine",
        password="unicorn"
    )
    with TestClient(app) as client:
        # Create user and use expired token
        loop = asyncio.get_event_loop()
        user_db = loop.run_until_complete(create_user(user))
        freezer.move_to("'2015-11-10'")
        response = client.get(
            "/users/me",
            headers={"Authorization": f"Bearer {user_db['token']['token']}"}
        )
    assert response.status_code == 401




Fuentes de PS



Eso es todo, el repositorio de origen de la publicación se puede ver en GitHub .



All Articles