Mini_Basic_Manufacturing_App/src/dao.py
2025-08-17 21:31:29 +07:00

901 lines
34 KiB
Python

import sqlite3
from datetime import datetime, date
from typing import List, Optional
from src.models import Product, Supplier, Customer, PurchaseOrder, PurchaseOrderItem, ManufacturingOrder, SalesOrder, SalesOrderItem, Inventory
class BaseDAO:
"""Base DAO class with common database operations"""
def __init__(self, db_manager):
self.db_manager = db_manager
class ProductDAO(BaseDAO):
"""Data access object for Product model"""
def create(self, product: Product) -> bool:
"""Create a new product"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO products (name, description, unit_price, sku, min_stock)
VALUES (?, ?, ?, ?, ?)
''', (product.name, product.description, product.unit_price, product.sku, product.min_stock))
product.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating product: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, product_id: int) -> Optional[Product]:
"""Get product by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM products WHERE id = ?', (product_id,))
row = cursor.fetchone()
if row:
return Product(
id=row['id'],
name=row['name'],
description=row['description'],
unit_price=row['unit_price'],
sku=row['sku'],
min_stock=row['min_stock'] if 'min_stock' in row.keys() else 0
)
return None
except Exception as e:
print(f"Error getting product: {e}")
return None
finally:
self.db_manager.disconnect()
def get_all(self) -> List[Product]:
"""Get all products"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM products ORDER BY name')
rows = cursor.fetchall()
products = []
for row in rows:
products.append(Product(
id=row['id'],
name=row['name'],
description=row['description'],
unit_price=row['unit_price'],
sku=row['sku'],
min_stock=row['min_stock'] if 'min_stock' in row.keys() else 0
))
return products
except Exception as e:
print(f"Error getting products: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, product: Product) -> bool:
"""Update a product"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE products
SET name = ?, description = ?, unit_price = ?, sku = ?, min_stock = ?
WHERE id = ?
''', (product.name, product.description, product.unit_price, product.sku, product.min_stock, product.id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating product: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, product_id: int) -> bool:
"""Delete a product"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('DELETE FROM products WHERE id = ?', (product_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting product: {e}")
return False
finally:
self.db_manager.disconnect()
class SupplierDAO(BaseDAO):
"""Data access object for Supplier model"""
def create(self, supplier: Supplier) -> bool:
"""Create a new supplier"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO suppliers (name, contact_person, phone, email, address)
VALUES (?, ?, ?, ?, ?)
''', (supplier.name, supplier.contact_person, supplier.phone, supplier.email, supplier.address))
supplier.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating supplier: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, supplier_id: int) -> Optional[Supplier]:
"""Get supplier by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM suppliers WHERE id = ?', (supplier_id,))
row = cursor.fetchone()
if row:
return Supplier(
id=row['id'],
name=row['name'],
contact_person=row['contact_person'],
phone=row['phone'],
email=row['email'],
address=row['address']
)
return None
except Exception as e:
print(f"Error getting supplier: {e}")
return None
finally:
self.db_manager.disconnect()
def get_all(self) -> List[Supplier]:
"""Get all suppliers"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM suppliers ORDER BY name')
rows = cursor.fetchall()
suppliers = []
for row in rows:
suppliers.append(Supplier(
id=row['id'],
name=row['name'],
contact_person=row['contact_person'],
phone=row['phone'],
email=row['email'],
address=row['address']
))
return suppliers
except Exception as e:
print(f"Error getting suppliers: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, supplier: Supplier) -> bool:
"""Update a supplier"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE suppliers
SET name = ?, contact_person = ?, phone = ?, email = ?, address = ?
WHERE id = ?
''', (supplier.name, supplier.contact_person, supplier.phone, supplier.email, supplier.address, supplier.id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating supplier: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, supplier_id: int) -> bool:
"""Delete a supplier"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('DELETE FROM suppliers WHERE id = ?', (supplier_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting supplier: {e}")
return False
finally:
self.db_manager.disconnect()
class CustomerDAO(BaseDAO):
"""Data access object for Customer model"""
def create(self, customer: Customer) -> bool:
"""Create a new customer"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO customers (name, contact_person, phone, email, address)
VALUES (?, ?, ?, ?, ?)
''', (customer.name, customer.contact_person, customer.phone, customer.email, customer.address))
customer.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating customer: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, customer_id: int) -> Optional[Customer]:
"""Get customer by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM customers WHERE id = ?', (customer_id,))
row = cursor.fetchone()
if row:
return Customer(
id=row['id'],
name=row['name'],
contact_person=row['contact_person'],
phone=row['phone'],
email=row['email'],
address=row['address']
)
return None
except Exception as e:
print(f"Error getting customer: {e}")
return None
finally:
self.db_manager.disconnect()
def get_all(self) -> List[Customer]:
"""Get all customers"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM customers ORDER BY name')
rows = cursor.fetchall()
customers = []
for row in rows:
customers.append(Customer(
id=row['id'],
name=row['name'],
contact_person=row['contact_person'],
phone=row['phone'],
email=row['email'],
address=row['address']
))
return customers
except Exception as e:
print(f"Error getting customers: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, customer: Customer) -> bool:
"""Update a customer"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE customers
SET name = ?, contact_person = ?, phone = ?, email = ?, address = ?
WHERE id = ?
''', (customer.name, customer.contact_person, customer.phone, customer.email, customer.address, customer.id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating customer: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, customer_id: int) -> bool:
"""Delete a customer"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('DELETE FROM customers WHERE id = ?', (customer_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting customer: {e}")
return False
finally:
self.db_manager.disconnect()
class PurchaseOrderDAO(BaseDAO):
"""Data access object for PurchaseOrder model"""
def create(self, purchase_order: PurchaseOrder) -> bool:
"""Create a new purchase order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO purchase_orders (supplier_id, order_date, expected_delivery, status, total_amount)
VALUES (?, ?, ?, ?, ?)
''', (purchase_order.supplier_id,
purchase_order.order_date.isoformat() if purchase_order.order_date else None,
purchase_order.expected_delivery.isoformat() if purchase_order.expected_delivery else None,
purchase_order.status,
purchase_order.total_amount))
purchase_order.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating purchase order: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, po_id: int) -> Optional[PurchaseOrder]:
"""Get purchase order by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM purchase_orders WHERE id = ?', (po_id,))
row = cursor.fetchone()
if row:
order_date = datetime.fromisoformat(row['order_date']).date() if row['order_date'] else None
expected_delivery = datetime.fromisoformat(row['expected_delivery']).date() if row['expected_delivery'] else None
return PurchaseOrder(
id=row['id'],
supplier_id=row['supplier_id'],
order_date=order_date,
expected_delivery=expected_delivery,
status=row['status'],
total_amount=row['total_amount']
)
return None
except Exception as e:
print(f"Error getting purchase order: {e}")
return None
finally:
self.db_manager.disconnect()
def get_all(self) -> List[PurchaseOrder]:
"""Get all purchase orders"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM purchase_orders ORDER BY order_date DESC')
rows = cursor.fetchall()
purchase_orders = []
for row in rows:
order_date = datetime.fromisoformat(row['order_date']).date() if row['order_date'] else None
expected_delivery = datetime.fromisoformat(row['expected_delivery']).date() if row['expected_delivery'] else None
purchase_orders.append(PurchaseOrder(
id=row['id'],
supplier_id=row['supplier_id'],
order_date=order_date,
expected_delivery=expected_delivery,
status=row['status'],
total_amount=row['total_amount']
))
return purchase_orders
except Exception as e:
print(f"Error getting purchase orders: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, purchase_order: PurchaseOrder) -> bool:
"""Update a purchase order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE purchase_orders
SET supplier_id = ?, order_date = ?, expected_delivery = ?, status = ?, total_amount = ?
WHERE id = ?
''', (purchase_order.supplier_id,
purchase_order.order_date.isoformat() if purchase_order.order_date else None,
purchase_order.expected_delivery.isoformat() if purchase_order.expected_delivery else None,
purchase_order.status,
purchase_order.total_amount,
purchase_order.id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating purchase order: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, po_id: int) -> bool:
"""Delete a purchase order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
# Delete items first
cursor.execute('DELETE FROM purchase_order_items WHERE purchase_order_id = ?', (po_id,))
# Delete order
cursor.execute('DELETE FROM purchase_orders WHERE id = ?', (po_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting purchase order: {e}")
return False
finally:
self.db_manager.disconnect()
class ManufacturingOrderDAO(BaseDAO):
"""Data access object for ManufacturingOrder model"""
def create(self, manufacturing_order: ManufacturingOrder) -> bool:
"""Create a new manufacturing order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO manufacturing_orders (product_id, quantity, start_date, end_date, status)
VALUES (?, ?, ?, ?, ?)
''', (manufacturing_order.product_id,
manufacturing_order.quantity,
manufacturing_order.start_date.isoformat() if manufacturing_order.start_date else None,
manufacturing_order.end_date.isoformat() if manufacturing_order.end_date else None,
manufacturing_order.status))
manufacturing_order.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating manufacturing order: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, mo_id: int) -> Optional[ManufacturingOrder]:
"""Get manufacturing order by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM manufacturing_orders WHERE id = ?', (mo_id,))
row = cursor.fetchone()
if row:
start_date = datetime.fromisoformat(row['start_date']).date() if row['start_date'] else None
end_date = datetime.fromisoformat(row['end_date']).date() if row['end_date'] else None
return ManufacturingOrder(
id=row['id'],
product_id=row['product_id'],
quantity=row['quantity'],
start_date=start_date,
end_date=end_date,
status=row['status']
)
return None
except Exception as e:
print(f"Error getting manufacturing order: {e}")
return None
finally:
self.db_manager.disconnect()
def get_all(self) -> List[ManufacturingOrder]:
"""Get all manufacturing orders"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM manufacturing_orders ORDER BY start_date DESC')
rows = cursor.fetchall()
manufacturing_orders = []
for row in rows:
start_date = datetime.fromisoformat(row['start_date']).date() if row['start_date'] else None
end_date = datetime.fromisoformat(row['end_date']).date() if row['end_date'] else None
manufacturing_orders.append(ManufacturingOrder(
id=row['id'],
product_id=row['product_id'],
quantity=row['quantity'],
start_date=start_date,
end_date=end_date,
status=row['status']
))
return manufacturing_orders
except Exception as e:
print(f"Error getting manufacturing orders: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, manufacturing_order: ManufacturingOrder) -> bool:
"""Update a manufacturing order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE manufacturing_orders
SET product_id = ?, quantity = ?, start_date = ?, end_date = ?, status = ?
WHERE id = ?
''', (manufacturing_order.product_id,
manufacturing_order.quantity,
manufacturing_order.start_date.isoformat() if manufacturing_order.start_date else None,
manufacturing_order.end_date.isoformat() if manufacturing_order.end_date else None,
manufacturing_order.status,
manufacturing_order.id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating manufacturing order: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, mo_id: int) -> bool:
"""Delete a manufacturing order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('DELETE FROM manufacturing_orders WHERE id = ?', (mo_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting manufacturing order: {e}")
return False
finally:
self.db_manager.disconnect()
class SalesOrderDAO(BaseDAO):
"""Data access object for SalesOrder model"""
def create(self, sales_order: SalesOrder) -> bool:
"""Create a new sales order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO sales_orders (customer_id, order_date, expected_delivery, status, total_amount)
VALUES (?, ?, ?, ?, ?)
''', (sales_order.customer_id,
sales_order.order_date.isoformat() if sales_order.order_date else None,
sales_order.expected_delivery.isoformat() if sales_order.expected_delivery else None,
sales_order.status,
sales_order.total_amount))
sales_order.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating sales order: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, so_id: int) -> Optional[SalesOrder]:
"""Get sales order by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM sales_orders WHERE id = ?', (so_id,))
row = cursor.fetchone()
if row:
order_date = datetime.fromisoformat(row['order_date']).date() if row['order_date'] else None
expected_delivery = datetime.fromisoformat(row['expected_delivery']).date() if row['expected_delivery'] else None
return SalesOrder(
id=row['id'],
customer_id=row['customer_id'],
order_date=order_date,
expected_delivery=expected_delivery,
status=row['status'],
total_amount=row['total_amount']
)
return None
except Exception as e:
print(f"Error getting sales order: {e}")
return None
finally:
self.db_manager.disconnect()
def get_all(self) -> List[SalesOrder]:
"""Get all sales orders"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM sales_orders ORDER BY order_date DESC')
rows = cursor.fetchall()
sales_orders = []
for row in rows:
order_date = datetime.fromisoformat(row['order_date']).date() if row['order_date'] else None
expected_delivery = datetime.fromisoformat(row['expected_delivery']).date() if row['expected_delivery'] else None
sales_orders.append(SalesOrder(
id=row['id'],
customer_id=row['customer_id'],
order_date=order_date,
expected_delivery=expected_delivery,
status=row['status'],
total_amount=row['total_amount']
))
return sales_orders
except Exception as e:
print(f"Error getting sales orders: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, sales_order: SalesOrder) -> bool:
"""Update a sales order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE sales_orders
SET customer_id = ?, order_date = ?, expected_delivery = ?, status = ?, total_amount = ?
WHERE id = ?
''', (sales_order.customer_id,
sales_order.order_date.isoformat() if sales_order.order_date else None,
sales_order.expected_delivery.isoformat() if sales_order.expected_delivery else None,
sales_order.status,
sales_order.total_amount,
sales_order.id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating sales order: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, so_id: int) -> bool:
"""Delete a sales order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
# Delete items first
cursor.execute('DELETE FROM sales_order_items WHERE sales_order_id = ?', (so_id,))
# Delete order
cursor.execute('DELETE FROM sales_orders WHERE id = ?', (so_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting sales order: {e}")
return False
finally:
self.db_manager.disconnect()
class InventoryDAO(BaseDAO):
"""Data access object for Inventory model"""
def create(self, inventory: Inventory) -> bool:
"""Create a new inventory record"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT OR IGNORE INTO inventory (product_id, quantity, reserved_quantity)
VALUES (?, ?, ?)
''', (inventory.product_id, inventory.quantity, inventory.reserved_quantity))
# If the record already exists, update it
cursor.execute('''
UPDATE inventory
SET quantity = ?, reserved_quantity = ?, last_updated = CURRENT_TIMESTAMP
WHERE product_id = ?
''', (inventory.quantity, inventory.reserved_quantity, inventory.product_id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating inventory record: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_product_id(self, product_id: int) -> Optional[Inventory]:
"""Get inventory by product ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM inventory WHERE product_id = ?', (product_id,))
row = cursor.fetchone()
if row:
last_updated = datetime.fromisoformat(row['last_updated']) if row['last_updated'] else None
return Inventory(
id=row['id'],
product_id=row['product_id'],
quantity=row['quantity'],
reserved_quantity=row['reserved_quantity']
)
return None
except Exception as e:
print(f"Error getting inventory: {e}")
return None
finally:
self.db_manager.disconnect()
def get_all(self) -> List[Inventory]:
"""Get all inventory records"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM inventory ORDER BY product_id')
rows = cursor.fetchall()
inventory_records = []
for row in rows:
last_updated = datetime.fromisoformat(row['last_updated']) if row['last_updated'] else None
inventory_records.append(Inventory(
id=row['id'],
product_id=row['product_id'],
quantity=row['quantity'],
reserved_quantity=row['reserved_quantity']
))
return inventory_records
except Exception as e:
print(f"Error getting inventory records: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, inventory: Inventory) -> bool:
"""Update an inventory record"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE inventory
SET quantity = ?, reserved_quantity = ?, last_updated = CURRENT_TIMESTAMP
WHERE product_id = ?
''', (inventory.quantity, inventory.reserved_quantity, inventory.product_id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating inventory: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, product_id: int) -> bool:
"""Delete an inventory record"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('DELETE FROM inventory WHERE product_id = ?', (product_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting inventory: {e}")
return False
finally:
self.db_manager.disconnect()
def adjust_quantity(self, product_id: int, quantity_change: int) -> bool:
"""Adjust inventory quantity"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE inventory
SET quantity = quantity + ?, last_updated = CURRENT_TIMESTAMP
WHERE product_id = ?
''', (quantity_change, product_id))
# If no rows were updated, insert a new record
if cursor.rowcount == 0:
cursor.execute('''
INSERT INTO inventory (product_id, quantity, reserved_quantity)
VALUES (?, ?, 0)
''', (product_id, quantity_change))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error adjusting inventory: {e}")
return False
finally:
self.db_manager.disconnect()