Zarządzanie tożsamością i dostępem użytkownika, czyli o co chodzi z IDP?
Czym jest tożsamość użytkownika? Z czego wynika potrzeba zarządzania dostępem w firmie? Jak działa tzw. IDP? Odpowiedź na te pytania znajdziesz w artykule.
Hej, dzisiaj jako Innokrea kontynuujemy temat tworzenia REST API z użyciem frameworka FastAPI, a także takich technologii jak MongoDB czy Docker. W ostatniej części udało nam się stworzyć podstawowy podział folderów, zaplanować architekturę, bazę danych i skonteneryzować niezbędne aplikacje. Dzisiaj spróbujemy wypełnić nasz projekt kodem i przetestować czy dodane funkcjonalności działają. Zachęcamy także do przeczytania poprzedniego artykułu tej serii.
Przypomnijmy sobie podział folderów odzwierciedlający częściowo naszą zaplanowaną warstwową architekturę. Każdy z folderów zawiera plik __init__.py, aby oznaczyć dany folder jako moduł Python. Nasza aplikacja będzie miała za zadanie obsługę podstawowych funkcji logowania, rejestracji użytkowników i przydzielania im tokenów z użyciem JWT.
Rysunek 1 – Struktura folderów
Zaczniemy od stworzenia klasy modelowej User odzwierciedlającej strukturę rekordu w bazie danych. W tym celu skorzystamy z biblioteki pydantic i klasy BaseModel. Utwórzmy w module models plik models.py:
from pydantic import BaseModel
from typing import Optional
from bson import ObjectId
class User(BaseModel):
_id: Optional[ObjectId] = None
email:str
role:str
password_hash:str
Powyższy model musi być zgodny z tym zainicjalizowanym w poprzednim artykule w pliku inicjalizacji bazy danych db-init.js. Spróbujmy teraz stworzyć połączenie do bazy danych. W module database stwórzmy plik connector.py i wypełnijmy go poniższą treścią.
import os
from pymongo import MongoClient
from pymongo.database import Database
from typing import Union
class Connector:
_client: Union[MongoClient, None] = None
hostname : Union[str, None] = os.getenv('DB_HOSTNAME')
username : Union[str, None] = os.getenv('DB_USERNAME')
password : Union[str, None] = os.getenv('DB_PASSWORD')
db_name: Union[str, None] = os.getenv('DB_NAME')
port: int = int(os.getenv('DB_PORT', '999999'))
@classmethod
def get_db_client(cls) -> MongoClient:
if not cls._client:
cls._client = MongoClient(host=cls.hostname,port=cls.port,username=cls.username,password=cls.password)
return cls._client
@classmethod
def get_db(cls) -> Database:
return cls.get_db_client().get_database(cls.db_name)
Należy pamiętać o ustawieniu wykorzystywanych tutaj zmiennych w pliku docker-compose-dev.yml.
Teraz w folderze repositories utwórzmy plik user_repository.py korzystający z klasy Database z biblioteki pymongo.
from pymongo.collection import Collection
from pymongo.database import Database
from pymongo.results import InsertOneResult
from app.models.models import User
from typing import Any, Dict, Union
class UserRepository:
def __init__(self, database: Database) -> None:
self.database: Database = database
self.users_collection: Collection = self.database['users']
def add_user(self, user: User) -> User:
user_dict: Dict[str, Any] = user.model_dump()
insertion_result: InsertOneResult = self.users_collection.insert_one(user_dict)
user._id = insertion_result.inserted_id
return user
def find_user_by_email(self, email: str) -> Union[User, None]:
user_data: Union[Dict[str, Any], None] = self.users_collection.find_one({"email": email})
if not user_data:
return None
return User(**user_data)
W klasie definiujemy konstruktor przyjmujący database oraz dwie metody – jedną do tworzenia użytkownika, a drugą do pozyskania obiektu użytkownika z bazy danych mognogdb.
Zbudujmy teraz warstwę logiki biznesowej dla użytkownika. W tym celu zaczniemy od mechanizmu wyjątków w FastAPI i w przypadku niepowodzenia, będziemy podnosić zdefiniowany przez nas wyjątek, następnie łapać go z użyciem handlera i zwracać odpowiedź HTTP o błędzie do użytkownika w postaci JSON. W module exceptions utwórzmy plik exceptions.py:
from fastapi import status, HTTPException
from fastapi.responses import JSONResponse
from fastapi import Request
# definitions
class DuplicateUserException(HTTPException):
def __init__(self, status_code: int = status.HTTP_400_BAD_REQUEST, detail: str = "This e-mail is not acceptable"):
super().__init__(status_code=status_code, detail=detail)
class InvalidEmailFormatException(HTTPException):
def __init__(self, status_code: int = status.HTTP_400_BAD_REQUEST, detail: str = "This e-mail is not acceptable"):
super().__init__(status_code=status_code, detail=detail)
class InvalidCredentialsException(HTTPException):
def __init__(self, status_code: int = status.HTTP_401_UNAUTHORIZED, detail: str = "Incorrect e-mail or password"):
super().__init__(status_code=status_code, detail=detail)
# handlers
async def handle_duplicate_user_exception(request: Request, exc: DuplicateUserException):
return JSONResponse(
status_code=exc.status_code,
content={'detail': exc.detail}
)
async def handle_invalid_email_format_exception(request: Request, exc: InvalidEmailFormatException):
return JSONResponse(
status_code=exc.status_code,
content={'detail': exc.detail}
)
async def handle_invalid_credentials_exception(request: Request, exc: InvalidCredentialsException):
return JSONResponse(
status_code=exc.status_code,
content={'detail': exc.detail}
)
Potrzebujemy także stworzyć tzw. schemas – czyli klasy modelowe, dla naszych żądań i odpowiedzi. W pliku schemas.py w module schemas definujemy następujące klasy z użyciem znanej nam biblioteki pydantic:
from pydantic import BaseModel
class NewUserRequest(BaseModel):
email: str
password: str
class UserLoginRequest(BaseModel):
email: str
password: str
class NewUserResponse(BaseModel):
email: str
role: str
class UserLoginResponse(BaseModel):
access_token: str
Teraz mamy już wszystko, aby odpowiednio zbudować naszą logikę biznesową. W tym celu tworzymy plik user_service.py w module services:
import re
from app.repositories.user_repository import UserRepository
from app.models.models import User
from hashlib import sha256
from app.schemas.schemas import *
from app.exceptions.exceptions import DuplicateUserException, InvalidEmailFormatException, InvalidCredentialsException
from typing import Union
email_regex = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b"
def is_email_valid(email: str) -> bool:
return bool(re.fullmatch(email_regex, email))
class UserService:
def __init__(self, repository:UserRepository) -> None:
self.repository: UserRepository = repository
def create_user(self, data:NewUserRequest, role:str) -> User:
email :str = data.email
password : str = data.password
if not is_email_valid(data.email):
raise InvalidEmailFormatException()
if self.repository.find_user_by_email(email):
raise DuplicateUserException()
password_hash: str = sha256(password.encode('utf-8')).hexdigest()
user:User = User(email=email,password_hash=password_hash, role=role)
user = self.repository.add_user(user)
return user
def check_user_credentials(self, email:str, password:str) -> User:
if not is_email_valid(email):
raise InvalidEmailFormatException()
user:Union[User,None] = self.repository.find_user_by_email(email)
if not user or sha256(password.encode('utf-8')).hexdigest() != user.password_hash:
raise InvalidCredentialsException()
return user
def check_user_exists(self,email) -> User:
if not is_email_valid(email):
raise InvalidEmailFormatException()
user:Union[User,None] = self.repository.find_user_by_email(email)
if not user:
raise InvalidCredentialsException()
return user
Widzimy zastosowanie naszych klas z modułu schema oraz raise w przypadku niezgodności warunków jak np. niezgodność formatu email czy istnienia już takiego użytkownika w bazie danych. Wyjątki nie są jednak jeszcze obsługiwane przez FastAPI, ale ich obsługę dodamy z użyciem odpowiednich metod na obiekcie FastAPI. Aby wygenerować odpowiedni token utworzymy jeszcze w module services plik token_service.py, który będzie odpowiadał za generowanie tokenu.
from typing import Union, Any
import os
from app.repositories.user_repository import UserRepository
from datetime import datetime, timedelta, timezone
from app.models.models import User
import jwt
class TokenService:
def __init__(self, user_repository: UserRepository) -> None:
self.user_repository: UserRepository = user_repository
self.jwt_access_token_secret_key: Union[str, None] = os.getenv('JWT_ACCESS_TOKEN_SECRET_KEY')
self.jwt_access_token_expire_minutes: int = int(os.getenv('JWT_ACCESS_TOKEN_EXPIRE_MINUTES', 7 * 24 * 60))
self.jwt_token_alg: Union[str, None] = os.getenv('JWT_TOKEN_ALG')
def generate_access_token(self, user_data: User) -> str:
expire: datetime = datetime.now(timezone.utc) + timedelta(minutes=self.jwt_access_token_expire_minutes)
data_to_encode: dict[str, Any] = {"email": user_data.email, "role": user_data.role, "exp": expire}
encoded_jwt: str = jwt.encode(data_to_encode, str(self.jwt_access_token_secret_key), str(self.jwt_token_alg))
return encoded_jwt
Przed pójściem dalej tj. do warstwy kontrolera, należy zdefiniować zależności w module dependencies w pliku dependencies.py, których będziemy używać w celu wstrzyknięcia zależności do endpointów w FastAPI.
from fastapi import Depends
from pymongo.database import Database
from app.services.user_service import UserService
from app.services.token_service import TokenService
from app.repositories.user_repository import UserRepository
from app.database.connector import Connector
def get_user_service(db: Database = Depends(Connector.get_db)) -> UserService:
return UserService(UserRepository(db))
def get_token_service(db: Database = Depends(Connector.get_db)) -> TokenService:
return TokenService(UserRepository(db))
Znajduje się tu inicjalizacja serwisów korzystających z połączenia do bazy danych poprzez repozytorium. Widać tutaj architekturę warstwową.
Teraz czas na zdefiniowanie kontrolera w module controllers, aby zdefiniować odpowiednie endpointy do logowania i rejestracji. Przeniesiemy logikę endpointów z pliku app.py do pliku user_controller.py.
from fastapi import APIRouter, Depends, status, Response
import os
from app.schemas.schemas import NewUserRequest, NewUserResponse, UserLoginRequest, UserLoginResponse
from app.services.user_service import UserService
from app.services.token_service import TokenService
from app.models.models import User
from app.dependencies.dependencies import get_user_service, get_token_service
router = APIRouter(
prefix=os.getenv('API_AUTHENTICATION_PREFIX','/api'),
tags=['user']
)
@router.post("/register", response_model=NewUserResponse, status_code=status.HTTP_201_CREATED)
def register(data: NewUserRequest, service: UserService = Depends(get_user_service)):
user: User = service.create_user(data, "user")
return NewUserResponse(email=user.email, role=user.role)
@router.post("/login",response_model=UserLoginResponse, status_code=status.HTTP_200_OK)
def login(data: UserLoginRequest, response: Response, user_service: UserService = Depends(get_user_service), token_service: TokenService = Depends(get_token_service)):
user: User = user_service.check_user_credentials(data.email, data.password)
access_token: str = token_service.generate_access_token(user)
response_model: UserLoginResponse = UserLoginResponse(access_token=access_token)
response.headers.append("Token-Type", "Bearer")
return response_model
W kontrolerze zdefiniowaliśmy router, żeby wyrzucić logikę tworzenia endpointów z pliku app.py. Używamy tutaj wstrzykniętego serwisu w celu obsługi logiki biznesowej oraz przyjmujemy żądania i wysyłamy odpowiedzi do klientów poprzez odpowiednio zdefiniowane przez modele zdefiniowane w schemas.
Ostatnim elementem naszej układanki jest zainicjalizowanie FastAPI i dodanie w obiekcie app reakcji na zdefiniowane w serwisach rzucanie wyjątku. To tutaj łączymy handler z definicją wyjątku.
from fastapi import FastAPI
from app.exceptions.exceptions import *
from app.controllers.user_controller import router as user_router
app = FastAPI()
app.add_exception_handler(DuplicateUserException, handle_duplicate_user_exception)
app.add_exception_handler(InvalidEmailFormatException, handle_invalid_email_format_exception)
app.add_exception_handler(InvalidCredentialsException, handle_invalid_credentials_exception)
app.include_router(user_router)
Aby uruchomić aplikację, wykonujemy komendę Docker’a:
docker-compose -f docker-compose-dev.yml up --build
Po uruchomieniu możemy przetestować endpoint’y z użyciem curl’a (localhost:8000/api):
curl -X POST "http://localhost:8000/api/register" -H "Content-Type: application/json" -d "{\"email\": \"example@example.com\",\"password\":\"securepassword\" }"
curl -X POST "http://localhost:8000/api/login" -H "Content-Type: application/json" -d "{\"email\": \"example@example.com\",\"password\":\"securepassword\" }"
Rysunek 2 – zapytania z użyciem curl’a
Dodany użytkownik powinien być widoczny poprzez panel administratora dostępny pod localhost:8083
Rysunek 3 – zawartość bazy danych po rejestracji użytkownika
Rysunek 4 – ostateczna struktura projektu
Udało nam się stworzyć działający projekt z warstwową architekturą, która może być rozszerzalna w łatwy sposób. Zaproponowaliśmy podział projektu na odpowiednie moduły. W następnym, ostatnim wpisie na temat FastAPI dowiemy się w jaki sposób dodać testy z biblioteką pytest i uruchomić je w Dockerze. Jeśli jesteś ciekawy to zachęcamy do lektury!
Zarządzanie tożsamością i dostępem użytkownika, czyli o co chodzi z IDP?
Czym jest tożsamość użytkownika? Z czego wynika potrzeba zarządzania dostępem w firmie? Jak działa tzw. IDP? Odpowiedź na te pytania znajdziesz w artykule.
Bezpieczeństwo
Hej, hej... Programisto, to kolejny artykuł dla Ciebie! Druga część artykułu na temat wzorców projektowych. Poznaj Adapter oraz Memento.
Programowanie
Programisto, to artykuł dla Ciebie! Łap garść przydatnych informacji na temat wzorców projektowych.
Programowanie