autocommit 13-07-2024-10-05

This commit is contained in:
Jasen Qin 2024-07-13 10:05:16 +10:00
parent f819c3b75f
commit 6fa5a9bb41
4 changed files with 312 additions and 10 deletions

View File

@ -1,9 +1,19 @@
import logging
from fastapi import FastAPI, HTTPException
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pymongo import MongoClient
from bson import ObjectId
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi.encoders import jsonable_encoder
# JWT Configuration
SECRET_KEY = "YOUR_SECRET_KEY" # Replace with a secure secret key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Configure logging
logging.basicConfig(level=logging.DEBUG,
@ -18,9 +28,13 @@ client = MongoClient("mongodb://localhost:27017")
db = client["pos_system"]
logger.debug("MongoDB connection established")
# -----=========
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Models
# ---------======
class Item(BaseModel):
@ -37,20 +51,196 @@ class Deal(BaseModel):
discount: float
class OrderItem(BaseModel):
item_id: str
quantity: int
class Order(BaseModel):
customer_name: str
items: List[str]
items: List[OrderItem]
total_amount: float
payment_method: str
date: str
voucher: Optional[str] = None
class User(BaseModel):
username: str
email: str
full_name: str
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
logger.debug("Data models defined")
# -----=========
# API ROUTES
# ---------======
# Helper functions
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(username: str):
user_dict = db.users.find_one({"username": username})
if user_dict:
return UserInDB(**user_dict)
def authenticate_user(username: str, password: str):
user = get_user(username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(datetime.UTC) + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# API Routes
# User Management
@app.post("/users/", response_model=User)
async def create_user(user: UserInDB):
hashed_password = get_password_hash(user.hashed_password)
user_dict = user.dict()
user_dict["hashed_password"] = hashed_password
result = db.users.insert_one(user_dict)
user_dict["_id"] = str(result.inserted_id)
return User(**user_dict)
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
# Order Management
@app.post("/orders/")
async def create_order(order: Order, current_user: User = Depends(get_current_active_user)):
logger.debug(f"Received request to create order: {order}")
order_dict = order.dict()
order_dict["user"] = current_user.username
result = db.orders.insert_one(order_dict)
logger.debug(f"Order created with ID: {result.inserted_id}")
return {"id": str(result.inserted_id)}
@app.get("/orders/")
async def read_orders(current_user: User = Depends(get_current_active_user)):
logger.debug("Received request to read all orders")
orders = list(db.orders.find({"user": current_user.username}))
logger.debug(f"Retrieved {len(orders)} orders")
return orders
@app.get("/orders/{order_id}")
async def read_order(order_id: str, current_user: User = Depends(get_current_active_user)):
logger.debug(f"Received request to read order with ID: {order_id}")
order = db.orders.find_one(
{"_id": ObjectId(order_id), "user": current_user.username})
if order:
logger.debug(f"Order found: {order}")
return order
logger.warning(f"Order with ID {order_id} not found")
raise HTTPException(status_code=404, detail="Order not found")
@app.put("/orders/{order_id}")
async def update_order(order_id: str, order: Order, current_user: User = Depends(get_current_active_user)):
logger.debug(f"Received request to update order with ID: {order_id}")
result = db.orders.update_one(
{"_id": ObjectId(order_id), "user": current_user.username},
{"$set": order.dict()}
)
if result.modified_count:
logger.debug(f"Order with ID {order_id} updated successfully")
return {"message": "Order updated successfully"}
logger.warning(f"Order with ID {order_id} not found for update")
raise HTTPException(status_code=404, detail="Order not found")
@app.delete("/orders/{order_id}")
async def delete_order(order_id: str, current_user: User = Depends(get_current_active_user)):
logger.debug(f"Received request to delete order with ID: {order_id}")
result = db.orders.delete_one(
{"_id": ObjectId(order_id), "user": current_user.username})
if result.deleted_count:
logger.debug(f"Order with ID {order_id} deleted successfully")
return {"message": "Order deleted successfully"}
logger.warning(f"Order with ID {order_id} not found for deletion")
raise HTTPException(status_code=404, detail="Order not found")
@app.post("/items/")
@ -61,12 +251,40 @@ async def create_item(item: Item):
return {"id": str(result.inserted_id)}
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid objectid")
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
def serialize_item(item):
return {
"id": str(item["_id"]),
"name": item["name"],
"price": item["price"],
"quantity": item["quantity"],
"unit": item["unit"],
"related_items": item.get("related_items", [])
}
@app.get("/items/")
async def read_items():
logger.debug("Received request to read all items")
items = list(db.items.find())
logger.debug(f"Retrieved {len(items)} items")
return items
serialized_items = [serialize_item(item) for item in items]
return serialized_items
@app.get("/items/{item_id}")
@ -102,6 +320,7 @@ async def delete_item(item_id: str):
logger.warning(f"Item with ID {item_id} not found for deletion")
raise HTTPException(status_code=404, detail="Item not found")
@app.get("/inventory/")
async def get_inventory():
logger.debug("Received request to get inventory")

83
py-kivy/poetry.lock generated
View File

@ -368,6 +368,24 @@ files = [
{file = "docutils-0.21.2.tar.gz", hash = "sha256:3a6b18732edf182daa3cd12775bbb338cf5691468f91eeeb109deff6ebfa986f"},
]
[[package]]
name = "ecdsa"
version = "0.19.0"
description = "ECDSA cryptographic signature library (pure python)"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.6"
files = [
{file = "ecdsa-0.19.0-py2.py3-none-any.whl", hash = "sha256:2cea9b88407fdac7bbeca0833b189e4c9c53f2ef1e1eaa29f6224dbc809b707a"},
{file = "ecdsa-0.19.0.tar.gz", hash = "sha256:60eaad1199659900dd0af521ed462b793bbdf867432b3948e87416ae4caf6bf8"},
]
[package.dependencies]
six = ">=1.9.0"
[package.extras]
gmpy = ["gmpy"]
gmpy2 = ["gmpy2"]
[[package]]
name = "email-validator"
version = "2.2.0"
@ -1098,6 +1116,23 @@ files = [
qa = ["flake8 (==5.0.4)", "mypy (==0.971)", "types-setuptools (==67.2.0.1)"]
testing = ["docopt", "pytest"]
[[package]]
name = "passlib"
version = "1.7.4"
description = "comprehensive password hashing framework supporting over 30 schemes"
optional = false
python-versions = "*"
files = [
{file = "passlib-1.7.4-py2.py3-none-any.whl", hash = "sha256:aa6bca462b8d8bda89c70b382f0c298a20b5560af6cbfa2dce410c0a2fb669f1"},
{file = "passlib-1.7.4.tar.gz", hash = "sha256:defd50f72b65c5402ab2c573830a6978e5f202ad0d984793c8dde2c4152ebe04"},
]
[package.extras]
argon2 = ["argon2-cffi (>=18.2.0)"]
bcrypt = ["bcrypt (>=3.1.0)"]
build-docs = ["cloud-sptheme (>=1.10.1)", "sphinx (>=1.6)", "sphinxcontrib-fulltoc (>=1.2.0)"]
totp = ["cryptography"]
[[package]]
name = "pexpect"
version = "4.9.0"
@ -1211,6 +1246,17 @@ files = [
[package.extras]
tests = ["pytest"]
[[package]]
name = "pyasn1"
version = "0.6.0"
description = "Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208)"
optional = false
python-versions = ">=3.8"
files = [
{file = "pyasn1-0.6.0-py2.py3-none-any.whl", hash = "sha256:cca4bb0f2df5504f02f6f8a775b6e416ff9b0b3b16f7ee80b5a3153d9b804473"},
{file = "pyasn1-0.6.0.tar.gz", hash = "sha256:3a35ab2c4b5ef98e17dfdec8ab074046fbda76e281c5a706ccd82328cfc8f64c"},
]
[[package]]
name = "pycparser"
version = "2.22"
@ -1493,6 +1539,27 @@ files = [
[package.extras]
cli = ["click (>=5.0)"]
[[package]]
name = "python-jose"
version = "3.3.0"
description = "JOSE implementation in Python"
optional = false
python-versions = "*"
files = [
{file = "python-jose-3.3.0.tar.gz", hash = "sha256:55779b5e6ad599c6336191246e95eb2293a9ddebd555f796a65f838f07e5d78a"},
{file = "python_jose-3.3.0-py2.py3-none-any.whl", hash = "sha256:9b1376b023f8b298536eedd47ae1089bcdb848f1535ab30555cd92002d78923a"},
]
[package.dependencies]
ecdsa = "!=0.15"
pyasn1 = "*"
rsa = "*"
[package.extras]
cryptography = ["cryptography (>=3.4.0)"]
pycrypto = ["pyasn1", "pycrypto (>=2.6.0,<2.7.0)"]
pycryptodome = ["pyasn1", "pycryptodome (>=3.3.1,<4.0.0)"]
[[package]]
name = "python-multipart"
version = "0.0.9"
@ -1729,6 +1796,20 @@ pygments = ">=2.13.0,<3.0.0"
[package.extras]
jupyter = ["ipywidgets (>=7.5.1,<9)"]
[[package]]
name = "rsa"
version = "4.9"
description = "Pure-Python RSA implementation"
optional = false
python-versions = ">=3.6,<4"
files = [
{file = "rsa-4.9-py3-none-any.whl", hash = "sha256:90260d9058e514786967344d0ef75fa8727eed8a7d2e43ce9f4bcf1b536174f7"},
{file = "rsa-4.9.tar.gz", hash = "sha256:e38464a49c6c85d7f1351b0126661487a7e0a14a50f1675ec50eb34d4f20ef21"},
]
[package.dependencies]
pyasn1 = ">=0.1.3"
[[package]]
name = "shellingham"
version = "1.5.4"
@ -2227,4 +2308,4 @@ files = [
[metadata]
lock-version = "2.0"
python-versions = "^3.12"
content-hash = "34ea3c9340ec7547a3ae1d23ce1c6aab2dd62366985cdfcb58020ec8be0b42ee"
content-hash = "210871253d8b131828bd0a0fe119fcb8486fd5ffd0b10caafbdab7a8eb6533f3"

View File

@ -13,6 +13,8 @@ fastapi = "^0.111.0"
hypothesis = "^6.105.1"
pymongo = "^4.8.0"
pytest = "^8.2.2"
python-jose = "^3.3.0"
passlib = "^1.7.4"
[tool.poetry.group.dev.dependencies]
ipykernel = "^6.29.5"

View File

@ -10,7 +10,7 @@ logger = logging.getLogger(__name__)
def setup_test_db():
client = MongoClient("mongodb://localhost:27017")
db = client["test_pos_system"]
db = client["pos_system"]
# Clear existing data
db.items.drop()