del aged things like fast_api server and models2

This commit is contained in:
Jasen Qin 2024-07-17 06:47:51 +10:00
parent e21e01e009
commit bfb4e2436e
4 changed files with 11 additions and 455 deletions

View File

@ -1,342 +0,0 @@
import logging
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pymongo import MongoClient
from bson import ObjectId
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi.encoders import jsonable_encoder
# JWT Configuration
SECRET_KEY = "YOUR_SECRET_KEY" # Replace with a secure secret key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Configure logging
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = FastAPI()
logger.debug("FastAPI application initialized")
# MongoDB connection
client = MongoClient("mongodb://localhost:27017")
db = client["pos_system"]
logger.debug("MongoDB connection established")
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Models
class Item(BaseModel):
name: str
price: float
quantity: int
unit: str
related_items: List[str] = []
class Deal(BaseModel):
name: str
items: List[str]
discount: float
class OrderItem(BaseModel):
item_id: str
quantity: int
class Order(BaseModel):
customer_name: str
items: List[OrderItem]
total_amount: float
payment_method: str
date: str
voucher: Optional[str] = None
class User(BaseModel):
username: str
email: str
full_name: str
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
logger.debug("Data models defined")
# Helper functions
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(username: str):
user_dict = db.users.find_one({"username": username})
if user_dict:
return UserInDB(**user_dict)
def authenticate_user(username: str, password: str):
user = get_user(username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(datetime.UTC) + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# API Routes
# User Management
@app.post("/users/", response_model=User)
async def create_user(user: UserInDB):
hashed_password = get_password_hash(user.hashed_password)
user_dict = user.dict()
user_dict["hashed_password"] = hashed_password
result = db.users.insert_one(user_dict)
user_dict["_id"] = str(result.inserted_id)
return User(**user_dict)
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
# Order Management
@app.post("/orders/")
async def create_order(order: Order, current_user: User = Depends(get_current_active_user)):
logger.debug(f"Received request to create order: {order}")
order_dict = order.dict()
order_dict["user"] = current_user.username
result = db.orders.insert_one(order_dict)
logger.debug(f"Order created with ID: {result.inserted_id}")
return {"id": str(result.inserted_id)}
@app.get("/orders/")
async def read_orders(current_user: User = Depends(get_current_active_user)):
logger.debug("Received request to read all orders")
orders = list(db.orders.find({"user": current_user.username}))
logger.debug(f"Retrieved {len(orders)} orders")
return orders
@app.get("/orders/{order_id}")
async def read_order(order_id: str, current_user: User = Depends(get_current_active_user)):
logger.debug(f"Received request to read order with ID: {order_id}")
order = db.orders.find_one(
{"_id": ObjectId(order_id), "user": current_user.username})
if order:
logger.debug(f"Order found: {order}")
return order
logger.warning(f"Order with ID {order_id} not found")
raise HTTPException(status_code=404, detail="Order not found")
@app.put("/orders/{order_id}")
async def update_order(order_id: str, order: Order, current_user: User = Depends(get_current_active_user)):
logger.debug(f"Received request to update order with ID: {order_id}")
result = db.orders.update_one(
{"_id": ObjectId(order_id), "user": current_user.username},
{"$set": order.dict()}
)
if result.modified_count:
logger.debug(f"Order with ID {order_id} updated successfully")
return {"message": "Order updated successfully"}
logger.warning(f"Order with ID {order_id} not found for update")
raise HTTPException(status_code=404, detail="Order not found")
@app.delete("/orders/{order_id}")
async def delete_order(order_id: str, current_user: User = Depends(get_current_active_user)):
logger.debug(f"Received request to delete order with ID: {order_id}")
result = db.orders.delete_one(
{"_id": ObjectId(order_id), "user": current_user.username})
if result.deleted_count:
logger.debug(f"Order with ID {order_id} deleted successfully")
return {"message": "Order deleted successfully"}
logger.warning(f"Order with ID {order_id} not found for deletion")
raise HTTPException(status_code=404, detail="Order not found")
@app.post("/items/")
async def create_item(item: Item):
logger.debug(f"Received request to create item: {item}")
result = db.items.insert_one(item.dict())
logger.debug(f"Item created with ID: {result.inserted_id}")
return {"id": str(result.inserted_id)}
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid objectid")
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
def serialize_item(item):
return {
"id": str(item["_id"]),
"name": item["name"],
"price": item["price"],
"quantity": item["quantity"],
"unit": item["unit"],
"related_items": item.get("related_items", [])
}
@app.get("/items/")
async def read_items():
logger.debug("Received request to read all items")
items = list(db.items.find())
logger.debug(f"Retrieved {len(items)} items")
serialized_items = [serialize_item(item) for item in items]
return serialized_items
@app.get("/items/{item_id}")
async def read_item(item_id: str):
logger.debug(f"Received request to read item with ID: {item_id}")
item = db.items.find_one({"_id": ObjectId(item_id)})
if item:
logger.debug(f"Item found: {item}")
return item
logger.warning(f"Item with ID {item_id} not found")
raise HTTPException(status_code=404, detail="Item not found")
@app.put("/items/{item_id}")
async def update_item(item_id: str, item: Item):
logger.debug(f"Received request to update item with ID: {item_id}")
result = db.items.update_one(
{"_id": ObjectId(item_id)}, {"$set": item.dict()})
if result.modified_count:
logger.debug(f"Item with ID {item_id} updated successfully")
return {"message": "Item updated successfully"}
logger.warning(f"Item with ID {item_id} not found for update")
raise HTTPException(status_code=404, detail="Item not found")
@app.delete("/items/{item_id}")
async def delete_item(item_id: str):
logger.debug(f"Received request to delete item with ID: {item_id}")
result = db.items.delete_one({"_id": ObjectId(item_id)})
if result.deleted_count:
logger.debug(f"Item with ID {item_id} deleted successfully")
return {"message": "Item deleted successfully"}
logger.warning(f"Item with ID {item_id} not found for deletion")
raise HTTPException(status_code=404, detail="Item not found")
@app.get("/inventory/")
async def get_inventory():
logger.debug("Received request to get inventory")
inventory = list(db.items.find({}, {"name": 1, "quantity": 1}))
logger.debug(f"Retrieved inventory with {len(inventory)} items")
return inventory
@app.get("/sales/")
async def get_sales_data():
# Implement aggregation for sales data
# This could include total sales, popular items, etc.
pass
if __name__ == "__main__":
logger.info("Starting the FastAPI application")
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@ -1,102 +0,0 @@
from pydantic import BaseModel, Field, EmailStr, field_validator
from typing import List, Optional
from datetime import datetime
from bson import ObjectId
from pydantic import ConfigDict
from typing import Any, ClassVar
class PyObjectId(ObjectId):
@classmethod
def __get_pydantic_core_schema__(
cls, _source_type: Any, _handler: Any
) -> Any:
from pydantic_core import core_schema
return core_schema.json_or_python_schema(
python_schema=core_schema.is_instance_schema(ObjectId),
json_schema=core_schema.StringSchema(),
serialization=core_schema.to_string_ser_schema(),
)
class MongoBaseModel(BaseModel):
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
model_config = ConfigDict(
populate_by_name=True,
arbitrary_types_allowed=True,
json_encoders={ObjectId: str}
)
class Item(MongoBaseModel):
name: str
price: float
quantity: int
unit: str
related_items: List[str] = []
class OrderItem(BaseModel):
item_id: PyObjectId
quantity: int
price_at_order: float
class Order(MongoBaseModel):
user_id: PyObjectId
items: List[OrderItem]
total_amount: float
payment_method: Optional[str] = None
payment_status: str = "pending"
order_status: str = "created"
created_at: datetime = Field(default_factory=datetime.now(datetime.UTC))
updated_at: Optional[datetime] = None
discount_applied: Optional[float] = None
notes: Optional[str] = None
@field_validator('order_status')
def valid_order_status(cls, v):
allowed_statuses = ["created", "processing",
"shipped", "delivered", "cancelled"]
if v not in allowed_statuses:
raise ValueError(f"Invalid order status. Must be one of: {
', '.join(allowed_statuses)}")
return v
@field_validator('payment_status')
def valid_payment_status(cls, v):
allowed_statuses = ["pending", "paid", "refunded", "failed"]
if v not in allowed_statuses:
raise ValueError(f"Invalid payment status. Must be one of: {
', '.join(allowed_statuses)}")
return v
class UserBase(BaseModel):
username: str
email: EmailStr
full_name: str
is_active: bool = True
is_superuser: bool = False
class UserCreate(UserBase):
password: str
class UserInDB(UserBase, MongoBaseModel):
hashed_password: str
class User(UserBase, MongoBaseModel):
pass
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None

View File

@ -2,10 +2,7 @@ from fastapi import FastAPI
from .routers import items, orders, users
from .database import init_db
def main():
app = FastAPI()
# Initialize database
init_db()
@ -14,5 +11,7 @@ def main():
app.include_router(orders.router)
app.include_router(users.router)
def main():
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@ -2,7 +2,8 @@ import pytest
from fastapi.testclient import TestClient
from hypothesis import given, strategies as st
from bson import ObjectId
from fastapi_server import app, db
from pos_system.server import app
from pos_system.database import db
client = TestClient(app)