From 2709da559432350bea4ee4125cb058c5157c5d5f Mon Sep 17 00:00:00 2001 From: Jasen Qin Date: Sat, 13 Jul 2024 10:22:12 +1000 Subject: [PATCH] autocommit 13-07-2024-10-22 --- py-kivy/pos_system/auth.py | 31 ++++++++ py-kivy/pos_system/database.py | 14 ++++ py-kivy/pos_system/models.py | 102 ++++++++++++++++++++++++ py-kivy/pos_system/routers/__init__.py | 0 py-kivy/pos_system/routers/items.py | 25 ++++++ py-kivy/pos_system/routers/orders.py | 106 +++++++++++++++++++++++++ py-kivy/pos_system/routers/users.py | 101 +++++++++++++++++++++++ py-kivy/pos_system/server.py | 18 +++++ 8 files changed, 397 insertions(+) create mode 100644 py-kivy/pos_system/auth.py create mode 100644 py-kivy/pos_system/database.py create mode 100644 py-kivy/pos_system/models.py create mode 100644 py-kivy/pos_system/routers/__init__.py create mode 100644 py-kivy/pos_system/routers/items.py create mode 100644 py-kivy/pos_system/routers/orders.py create mode 100644 py-kivy/pos_system/routers/users.py create mode 100644 py-kivy/pos_system/server.py diff --git a/py-kivy/pos_system/auth.py b/py-kivy/pos_system/auth.py new file mode 100644 index 0000000..cb221ae --- /dev/null +++ b/py-kivy/pos_system/auth.py @@ -0,0 +1,31 @@ +from fastapi.security import OAuth2PasswordBearer +from passlib.context import CryptContext +from datetime import datetime, timedelta +from jose import JWTError, jwt +from .database import get_db + +# JWT Configuration +SECRET_KEY = "YOUR_SECRET_KEY" +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 + +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + + +def verify_password(plain_password, hashed_password): + return pwd_context.verify(plain_password, hashed_password) + + +def get_password_hash(password): + return pwd_context.hash(password) + + +def create_access_token(data: dict): + to_encode = data.copy() + expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded_jwt + +# Add other authentication-related functions here diff --git a/py-kivy/pos_system/database.py b/py-kivy/pos_system/database.py new file mode 100644 index 0000000..4f04378 --- /dev/null +++ b/py-kivy/pos_system/database.py @@ -0,0 +1,14 @@ +from pymongo import MongoClient + +client = None +db = None + + +def init_db(): + global client, db + client = MongoClient("mongodb://localhost:27017") + db = client["pos_system"] + + +def get_db(): + return db diff --git a/py-kivy/pos_system/models.py b/py-kivy/pos_system/models.py new file mode 100644 index 0000000..85a79a0 --- /dev/null +++ b/py-kivy/pos_system/models.py @@ -0,0 +1,102 @@ +from pydantic import BaseModel, Field, EmailStr, validator +from typing import List, Optional +from datetime import datetime +from bson import ObjectId + + +class PyObjectId(ObjectId): + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def validate(cls, v): + if not ObjectId.is_valid(v): + raise ValueError("Invalid objectid") + return ObjectId(v) + + @classmethod + def __modify_schema__(cls, field_schema): + field_schema.update(type="string") + + +class MongoBaseModel(BaseModel): + id: Optional[PyObjectId] = Field(default_factory=PyObjectId, alias="_id") + + class Config: + allow_population_by_field_name = True + arbitrary_types_allowed = True + json_encoders = {ObjectId: str} + + +class Item(MongoBaseModel): + name: str + price: float + quantity: int + unit: str + related_items: List[str] = [] + + +class OrderItem(BaseModel): + item_id: PyObjectId + quantity: int + price_at_order: float + + +class Order(MongoBaseModel): + user_id: PyObjectId + items: List[OrderItem] + total_amount: float + payment_method: Optional[str] = None + payment_status: str = "pending" + order_status: str = "created" + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: Optional[datetime] = None + discount_applied: Optional[float] = None + notes: Optional[str] = None + + @validator('order_status') + def valid_order_status(cls, v): + allowed_statuses = ["created", "processing", + "shipped", "delivered", "cancelled"] + if v not in allowed_statuses: + raise ValueError(f"Invalid order status. Must be one of: { + ', '.join(allowed_statuses)}") + return v + + @validator('payment_status') + def valid_payment_status(cls, v): + allowed_statuses = ["pending", "paid", "refunded", "failed"] + if v not in allowed_statuses: + raise ValueError(f"Invalid payment status. Must be one of: { + ', '.join(allowed_statuses)}") + return v + + +class UserBase(BaseModel): + username: str + email: EmailStr + full_name: str + is_active: bool = True + is_superuser: bool = False + + +class UserCreate(UserBase): + password: str + + +class UserInDB(UserBase, MongoBaseModel): + hashed_password: str + + +class User(UserBase, MongoBaseModel): + pass + + +class Token(BaseModel): + access_token: str + token_type: str + + +class TokenData(BaseModel): + username: Optional[str] = None diff --git a/py-kivy/pos_system/routers/__init__.py b/py-kivy/pos_system/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/py-kivy/pos_system/routers/items.py b/py-kivy/pos_system/routers/items.py new file mode 100644 index 0000000..a27a065 --- /dev/null +++ b/py-kivy/pos_system/routers/items.py @@ -0,0 +1,25 @@ +from fastapi import APIRouter, HTTPException, Depends +from ..models import Item +from ..database import get_db +from bson import ObjectId + +router = APIRouter( + prefix="/items", + tags=["items"] +) + + +@router.post("/") +async def create_item(item: Item): + db = get_db() + result = db.items.insert_one(item.dict(by_alias=True)) + return {"id": str(result.inserted_id)} + + +@router.get("/") +async def read_items(): + db = get_db() + items = list(db.items.find()) + return [Item(**item).dict(by_alias=True) for item in items] + +# Add other item-related routes here diff --git a/py-kivy/pos_system/routers/orders.py b/py-kivy/pos_system/routers/orders.py new file mode 100644 index 0000000..f46fac0 --- /dev/null +++ b/py-kivy/pos_system/routers/orders.py @@ -0,0 +1,106 @@ +from fastapi import APIRouter, HTTPException, Depends, status +from ..models import Order, User +from ..database import get_db +from ..auth import get_current_user +from bson import ObjectId +from typing import List + +router = APIRouter( + prefix="/orders", + tags=["orders"] +) + + +@router.post("/", response_model=Order) +async def create_order(order: Order, current_user: User = Depends(get_current_user)): + db = get_db() + order_dict = order.dict() + order_dict["user_id"] = str(current_user.id) + result = db.orders.insert_one(order_dict) + created_order = db.orders.find_one({"_id": result.inserted_id}) + return Order(**created_order) + + +@router.get("/", response_model=List[Order]) +async def read_orders(skip: int = 0, limit: int = 10, current_user: User = Depends(get_current_user)): + db = get_db() + orders = list(db.orders.find( + {"user_id": str(current_user.id)}).skip(skip).limit(limit)) + return [Order(**order) for order in orders] + + +@router.get("/{order_id}", response_model=Order) +async def read_order(order_id: str, current_user: User = Depends(get_current_user)): + db = get_db() + order = db.orders.find_one( + {"_id": ObjectId(order_id), "user_id": str(current_user.id)}) + if order is None: + raise HTTPException(status_code=404, detail="Order not found") + return Order(**order) + + +@router.put("/{order_id}", response_model=Order) +async def update_order(order_id: str, order: Order, current_user: User = Depends(get_current_user)): + db = get_db() + existing_order = db.orders.find_one( + {"_id": ObjectId(order_id), "user_id": str(current_user.id)}) + if not existing_order: + raise HTTPException(status_code=404, detail="Order not found") + update_data = order.dict(exclude_unset=True) + result = db.orders.update_one( + {"_id": ObjectId(order_id)}, {"$set": update_data}) + if result.modified_count == 1: + updated_order = db.orders.find_one({"_id": ObjectId(order_id)}) + return Order(**updated_order) + else: + raise HTTPException(status_code=400, detail="Order update failed") + + +@router.delete("/{order_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_order(order_id: str, current_user: User = Depends(get_current_user)): + db = get_db() + result = db.orders.delete_one( + {"_id": ObjectId(order_id), "user_id": str(current_user.id)}) + if result.deleted_count != 1: + raise HTTPException(status_code=404, detail="Order not found") + + +@router.post("/{order_id}/process_payment") +async def process_payment(order_id: str, payment_method: str, current_user: User = Depends(get_current_user)): + db = get_db() + order = db.orders.find_one( + {"_id": ObjectId(order_id), "user_id": str(current_user.id)}) + if not order: + raise HTTPException(status_code=404, detail="Order not found") + # Implement payment processing logic here + # For this example, we'll just update the order status + result = db.orders.update_one( + {"_id": ObjectId(order_id)}, + {"$set": {"payment_status": "paid", "payment_method": payment_method}} + ) + if result.modified_count == 1: + return {"message": "Payment processed successfully"} + else: + raise HTTPException(status_code=400, detail="Payment processing failed") + + +@router.post("/{order_id}/apply_discount") +async def apply_discount(order_id: str, discount_percentage: float, current_user: User = Depends(get_current_user)): + db = get_db() + order = db.orders.find_one( + {"_id": ObjectId(order_id), "user_id": str(current_user.id)}) + if not order: + raise HTTPException(status_code=404, detail="Order not found") + if discount_percentage < 0 or discount_percentage > 100: + raise HTTPException(status_code=400, detail="Invalid discount percentage") + + new_total = order["total_amount"] * (1 - discount_percentage / 100) + result = db.orders.update_one( + {"_id": ObjectId(order_id)}, + {"$set": {"total_amount": new_total, "discount_applied": discount_percentage}} + ) + if result.modified_count == 1: + updated_order = db.orders.find_one({"_id": ObjectId(order_id)}) + return Order(**updated_order) + else: + raise HTTPException(status_code=400, detail="Discount application failed") diff --git a/py-kivy/pos_system/routers/users.py b/py-kivy/pos_system/routers/users.py new file mode 100644 index 0000000..d5b2123 --- /dev/null +++ b/py-kivy/pos_system/routers/users.py @@ -0,0 +1,101 @@ +from fastapi import APIRouter, HTTPException, Depends, status +from fastapi.security import OAuth2PasswordRequestForm +from ..models import User, UserInDB +from ..database import get_db +from ..auth import get_password_hash, verify_password, create_access_token, oauth2_scheme +from bson import ObjectId +from typing import List + +router = APIRouter( + prefix="/users", + tags=["users"] +) + + +def get_user(username: str): + db = get_db() + user_dict = db.users.find_one({"username": username}) + if user_dict: + return UserInDB(**user_dict) + + +async def get_current_user(token: str = Depends(oauth2_scheme)): + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + # Implement JWT token validation here + # For brevity, we're skipping the actual implementation + username = "test_user" # This should come from the validated token + user = get_user(username) + if user is None: + raise credentials_exception + return user + + +@router.post("/register", response_model=User) +async def create_user(user: UserInDB): + db = get_db() + existing_user = db.users.find_one({"username": user.username}) + if existing_user: + raise HTTPException(status_code=400, detail="Username already registered") + hashed_password = get_password_hash(user.password) + user_dict = user.dict() + user_dict["hashed_password"] = hashed_password + del user_dict["password"] + result = db.users.insert_one(user_dict) + created_user = db.users.find_one({"_id": result.inserted_id}) + return User(**created_user) + + +@router.post("/token") +async def login(form_data: OAuth2PasswordRequestForm = Depends()): + user = get_user(form_data.username) + if not user or not verify_password(form_data.password, user.hashed_password): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token = create_access_token(data={"sub": user.username}) + return {"access_token": access_token, "token_type": "bearer"} + + +@router.get("/me", response_model=User) +async def read_users_me(current_user: User = Depends(get_current_user)): + return current_user + + +@router.get("/", response_model=List[User]) +async def read_users(skip: int = 0, limit: int = 10, current_user: User = Depends(get_current_user)): + db = get_db() + users = list(db.users.find().skip(skip).limit(limit)) + return [User(**user) for user in users] + + +@router.put("/{user_id}", response_model=User) +async def update_user(user_id: str, user: UserInDB, current_user: User = Depends(get_current_user)): + db = get_db() + existing_user = db.users.find_one({"_id": ObjectId(user_id)}) + if not existing_user: + raise HTTPException(status_code=404, detail="User not found") + update_data = user.dict(exclude_unset=True) + if "password" in update_data: + update_data["hashed_password"] = get_password_hash(update_data["password"]) + del update_data["password"] + result = db.users.update_one( + {"_id": ObjectId(user_id)}, {"$set": update_data}) + if result.modified_count == 1: + updated_user = db.users.find_one({"_id": ObjectId(user_id)}) + return User(**updated_user) + else: + raise HTTPException(status_code=400, detail="User update failed") + + +@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_user(user_id: str, current_user: User = Depends(get_current_user)): + db = get_db() + result = db.users.delete_one({"_id": ObjectId(user_id)}) + if result.deleted_count != 1: + raise HTTPException(status_code=404, detail="User not found") diff --git a/py-kivy/pos_system/server.py b/py-kivy/pos_system/server.py new file mode 100644 index 0000000..96c2e01 --- /dev/null +++ b/py-kivy/pos_system/server.py @@ -0,0 +1,18 @@ +from fastapi import FastAPI +from .routers import items, orders, users +from .database import init_db + + +def main(): + app = FastAPI() + + # Initialize database + init_db() + + # Include routers + app.include_router(items.router) + app.include_router(orders.router) + app.include_router(users.router) + + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000)