first commit

This commit is contained in:
Suherdy Yacob 2025-08-17 21:31:29 +07:00
commit 38cf8c2cfc
47 changed files with 6437 additions and 0 deletions

121
README.md Normal file
View File

@ -0,0 +1,121 @@
# Mini Basic Manufacture App
A simple manufacturing application for small businesses, designed for ease of use by users 50 years and older with a modern UI.
## Features
- **User Authentication & Permissions**: Secure login system with role-based access control
- **Database Management**: Supports both SQLite (default) and PostgreSQL with backup/restore/duplicate functionality
- **Core Modules**:
- Purchase Management
- Basic Manufacturing (simple production tracking)
- Sales Management
- Inventory Tracking
- **User & Group Configuration**: Admin panel for managing users and permissions
- **Dashboard**: Overview of key business metrics
- **Excel Export**: Export all data to Excel format for external analysis
- **Modern UI**: Clean, modern interface using customtkinter
- **Cross-platform**: Works on Windows, macOS, and Linux
## Requirements
- Python 3.6 or higher
- SQLite (included with Python)
- Optional: PostgreSQL for production use
## Installation
1. Clone or download this repository
2. Install the required packages:
```
pip install -r requirements.txt
```
3. Run the application:
```
python main.py
```
## First Run
1. On first run, you'll be prompted to create an admin account
2. Log in with your admin credentials
3. Configure database settings if needed (SQLite is default)
4. Start using the application modules
## Database Configuration
Admin users can configure the database connection:
- **SQLite**: File-based database, no additional setup required
- **PostgreSQL**: For production environments, requires PostgreSQL server
Database management features:
- Backup: Create a backup of the current database
- Restore: Restore from a previous backup
- Duplicate: Create a copy of the database
## User Permissions
The application supports:
- Admin users: Full access to all features
- Regular users: Access based on group permissions
- User groups: Assign permissions to groups of users
## Modules
### Purchase
- Create and manage purchase orders
- Track supplier information
- Monitor order status
- View purchase reports with monthly charts
- **Export**: Export purchase orders to Excel format
### Manufacturing
- Create manufacturing orders for end products
- Track production progress
- Simple workflow without complex BOM
- **Export**: Export manufacturing orders to Excel format
### Sales
- Create and manage sales orders
- Track customer information
- Monitor order status
- View sales reports with monthly charts
- **Export**: Export sales orders to Excel format
### Inventory
- Track product quantities
- View stock levels
- Adjust inventory manually
- **Export**: Export inventory data and stock movements to Excel format
## Dashboard
The dashboard provides an overview of key business metrics:
- Quick stats on pending orders and low inventory
- Financial summary with revenue, costs, and profit
- Monthly sales and purchase charts
- Recent activities log
## Excel Export Features
All modules support Excel export functionality:
- **Purchase Orders**: Export all purchase order data including suppliers, items, and status
- **Manufacturing Orders**: Export production orders with product details and progress
- **Sales Orders**: Export sales data including customers, products, and order status
- **Inventory**: Export current stock levels and detailed stock movement history
Exported files are saved with timestamps in the filename for easy organization.
## Development
This application is built with:
- Python 3
- **customtkinter** for the modern GUI (replaced standard tkinter)
- SQLAlchemy for database operations
- Matplotlib for charting
- SQLite/PostgreSQL for data storage
- **pandas** and **openpyxl** for Excel export functionality
## License
This project is open source and available under the MIT License.

Binary file not shown.

29
main.py Normal file
View File

@ -0,0 +1,29 @@
#!/usr/bin/env python3
"""
Mini Basic Manufacture App
A simple manufacturing application for small businesses
"""
import customtkinter as ctk
from tkinter import messagebox
import sys
import os
# Add the src directory to the path
sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))
from src.app import ManufacturingApp
def main():
"""Main entry point for the application"""
try:
app = ManufacturingApp()
app.run()
except Exception as e:
messagebox.showerror("Error", f"Failed to start application: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()

BIN
manufacturing.db Normal file

Binary file not shown.

13
requirements.txt Normal file
View File

@ -0,0 +1,13 @@
# Core dependencies
sqlalchemy>=2.0.0
bcrypt>=4.0.0
# UI dependencies
customtkinter>=5.2.0
# Excel export dependencies
openpyxl>=3.1.0
pandas>=2.0.0
# Development dependencies (optional)
pytest>=7.0.0

0
src/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

232
src/app.py Normal file
View File

@ -0,0 +1,232 @@
import customtkinter as ctk
from tkinter import messagebox
import os
import sys
# Add the parent directory to the path to allow imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.database import DatabaseManager
from src.auth import AuthManager
from src.ui.main_window import MainWindow
class ManufacturingApp:
"""Main application class for the manufacturing app"""
def __init__(self):
"""Initialize the application"""
ctk.set_appearance_mode("light")
ctk.set_default_color_theme("blue")
self.root = ctk.CTk()
self.root.title("Mini Basic Manufacture App")
self.root.geometry("400x500")
self.root.resizable(False, False)
# Center the window
self.root.update_idletasks()
width = self.root.winfo_width()
height = self.root.winfo_height()
x = (self.root.winfo_screenwidth() // 2) - (width // 2)
y = (self.root.winfo_screenheight() // 2) - (height // 2)
self.root.geometry(f'{width}x{height}+{x}+{y}')
# Initialize managers
self.db_manager = DatabaseManager()
self.auth_manager = AuthManager(self.db_manager)
# Initialize services
self.product_service = None
self.supplier_service = None
self.customer_service = None
self.purchase_order_service = None
self.manufacturing_order_service = None
self.sales_order_service = None
self.inventory_service = None
# Initialize main window
self.main_window = None
def run(self):
"""Run the application"""
# Show login window first
self.show_login()
self.root.mainloop()
def show_login(self):
"""Show the login window"""
# Clear any existing widgets
for widget in self.root.winfo_children():
widget.destroy()
# Initialize database first
try:
self.db_manager.initialize_database()
# Initialize default groups
self.auth_manager.initialize_default_groups()
except Exception as e:
messagebox.showerror("Database Error", f"Failed to initialize database: {str(e)}")
return
# Create login frame
login_frame = ctk.CTkFrame(self.root)
login_frame.pack(fill="both", expand=True, padx=20, pady=20)
# Title
title_label = ctk.CTkLabel(login_frame, text="Mini Basic Manufacture App",
font=("Arial", 24, "bold"))
title_label.pack(pady=(20, 30))
# Logo placeholder
logo_frame = ctk.CTkFrame(login_frame, fg_color="transparent")
logo_frame.pack(pady=(0, 20))
logo_label = ctk.CTkLabel(logo_frame, text="🏭", font=("Arial", 48))
logo_label.pack()
# Username
username_label = ctk.CTkLabel(login_frame, text="Username:", font=("Arial", 14))
username_label.pack(anchor="w", padx=20, pady=(0, 5))
self.username_entry = ctk.CTkEntry(login_frame, width=300, height=40,
font=("Arial", 14))
self.username_entry.pack(pady=(0, 15))
self.username_entry.focus()
# Password
password_label = ctk.CTkLabel(login_frame, text="Password:", font=("Arial", 14))
password_label.pack(anchor="w", padx=20, pady=(0, 5))
self.password_entry = ctk.CTkEntry(login_frame, width=300, height=40,
font=("Arial", 14), show="*")
self.password_entry.pack(pady=(0, 20))
# Login button
login_button = ctk.CTkButton(login_frame, text="Login", command=self.login,
width=300, height=40, font=("Arial", 14))
login_button.pack(pady=(0, 10))
# Bind Enter key to login
self.password_entry.bind("<Return>", lambda e: self.login())
# Admin setup button (only shown if no users exist)
try:
if not self.auth_manager.has_users():
setup_button = ctk.CTkButton(login_frame, text="Setup Admin Account",
command=self.show_admin_setup,
width=300, height=40,
fg_color="green", hover_color="dark green")
setup_button.pack(pady=(10, 20))
except Exception as e:
messagebox.showerror("Error", f"Failed to check users: {str(e)}")
def login(self):
"""Handle login"""
username = self.username_entry.get()
password = self.password_entry.get()
if not username or not password:
messagebox.showerror("Error", "Please enter both username and password")
return
user = self.auth_manager.authenticate(username, password)
if user:
messagebox.showinfo("Success", f"Welcome, {user['username']}!")
self.show_main_window(user)
else:
messagebox.showerror("Error", "Invalid username or password")
def show_admin_setup(self):
"""Show admin account setup"""
# Clear any existing widgets
for widget in self.root.winfo_children():
widget.destroy()
# Create setup frame
setup_frame = ctk.CTkFrame(self.root)
setup_frame.pack(fill="both", expand=True, padx=20, pady=20)
# Title
title_label = ctk.CTkLabel(setup_frame, text="Admin Account Setup",
font=("Arial", 24, "bold"))
title_label.pack(pady=(20, 30))
# Username
username_label = ctk.CTkLabel(setup_frame, text="Admin Username:",
font=("Arial", 14))
username_label.pack(anchor="w", padx=20, pady=(0, 5))
self.admin_username_entry = ctk.CTkEntry(setup_frame, width=300, height=40,
font=("Arial", 14))
self.admin_username_entry.pack(pady=(0, 15))
self.admin_username_entry.focus()
# Password
password_label = ctk.CTkLabel(setup_frame, text="Admin Password:",
font=("Arial", 14))
password_label.pack(anchor="w", padx=20, pady=(0, 5))
self.admin_password_entry = ctk.CTkEntry(setup_frame, width=300, height=40,
font=("Arial", 14), show="*")
self.admin_password_entry.pack(pady=(0, 15))
# Confirm Password
confirm_label = ctk.CTkLabel(setup_frame, text="Confirm Password:",
font=("Arial", 14))
confirm_label.pack(anchor="w", padx=20, pady=(0, 5))
self.admin_confirm_entry = ctk.CTkEntry(setup_frame, width=300, height=40,
font=("Arial", 14), show="*")
self.admin_confirm_entry.pack(pady=(0, 20))
# Setup button
setup_button = ctk.CTkButton(setup_frame, text="Create Admin Account",
command=self.create_admin,
width=300, height=40, font=("Arial", 14))
setup_button.pack(pady=(0, 10))
# Back to login button
back_button = ctk.CTkButton(setup_frame, text="Back to Login",
command=self.show_login,
width=300, height=40, font=("Arial", 14),
fg_color="gray", hover_color="dark gray")
back_button.pack(pady=(10, 20))
def create_admin(self):
"""Create admin account"""
username = self.admin_username_entry.get()
password = self.admin_password_entry.get()
confirm = self.admin_confirm_entry.get()
if not username or not password:
messagebox.showerror("Error", "Please enter both username and password")
return
if password != confirm:
messagebox.showerror("Error", "Passwords do not match")
return
try:
self.auth_manager.create_user(username, password, is_admin=True)
messagebox.showinfo("Success", "Admin account created successfully!")
self.show_login()
except Exception as e:
messagebox.showerror("Error", f"Failed to create admin account: {str(e)}")
def show_main_window(self, user):
"""Show the main application window"""
# Clear any existing widgets
for widget in self.root.winfo_children():
widget.destroy()
# Resize window for main application
self.root.geometry("1200x800")
# Create main window
self.main_window = MainWindow(self.root, user, self.db_manager, self)
def restart(self):
"""Restart the application"""
self.root.destroy()
self.__init__()
self.run()

357
src/auth.py Normal file
View File

@ -0,0 +1,357 @@
import hashlib
import sqlite3
import os
class AuthManager:
"""Authentication manager for the manufacturing app"""
def __init__(self, db_manager):
"""Initialize the authentication manager"""
self.db_manager = db_manager
self.current_user = None
def hash_password(self, password):
"""Hash a password using SHA-256"""
return hashlib.sha256(password.encode()).hexdigest()
def create_user(self, username, password, is_admin=False):
"""Create a new user"""
if not self.db_manager.connect():
raise Exception("Database connection failed")
try:
cursor = self.db_manager.connection.cursor()
password_hash = self.hash_password(password)
cursor.execute('''
INSERT INTO users (username, password_hash, is_admin)
VALUES (?, ?, ?)
''', (username, password_hash, is_admin))
self.db_manager.connection.commit()
return True
except sqlite3.IntegrityError:
raise Exception("Username already exists")
except Exception as e:
raise Exception(f"Failed to create user: {str(e)}")
finally:
self.db_manager.disconnect()
def authenticate(self, username, password):
"""Authenticate a user"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
password_hash = self.hash_password(password)
cursor.execute('''
SELECT id, username, is_admin FROM users
WHERE username = ? AND password_hash = ?
''', (username, password_hash))
user = cursor.fetchone()
if user:
self.current_user = {
'id': user[0],
'username': user[1],
'is_admin': user[2]
}
return self.current_user
return None
except Exception as e:
print(f"Authentication error: {e}")
return None
finally:
self.db_manager.disconnect()
def has_users(self):
"""Check if any users exist"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT COUNT(*) FROM users')
count = cursor.fetchone()[0]
return count > 0
except Exception as e:
print(f"Error checking users: {e}")
return False
finally:
self.db_manager.disconnect()
def get_user_permissions(self, user_id):
"""Get user permissions"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
# Get user groups
cursor.execute('''
SELECT g.permissions FROM user_groups g
JOIN user_group_members m ON g.id = m.group_id
WHERE m.user_id = ?
''', (user_id,))
permissions = []
for row in cursor.fetchall():
if row[0]:
permissions.extend(row[0].split(','))
return permissions
except Exception as e:
print(f"Error getting user permissions: {e}")
return []
finally:
self.db_manager.disconnect()
def user_has_permission(self, user_id, permission):
"""Check if user has a specific permission"""
permissions = self.get_user_permissions(user_id)
return permission in permissions or self.is_admin(user_id)
def is_admin(self, user_id):
"""Check if user is admin"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT is_admin FROM users WHERE id = ?', (user_id,))
result = cursor.fetchone()
return result[0] if result else False
except Exception as e:
print(f"Error checking admin status: {e}")
return False
finally:
self.db_manager.disconnect()
def get_all_users(self):
"""Get all users"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT id, username, is_admin FROM users')
users = []
for row in cursor.fetchall():
users.append({
'id': row[0],
'username': row[1],
'is_admin': row[2]
})
return users
except Exception as e:
print(f"Error getting users: {e}")
return []
finally:
self.db_manager.disconnect()
def delete_user(self, user_id):
"""Delete a user"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
# Delete user group memberships first
cursor.execute('DELETE FROM user_group_members WHERE user_id = ?', (user_id,))
# Delete user
cursor.execute('DELETE FROM users WHERE id = ?', (user_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting user: {e}")
return False
finally:
self.db_manager.disconnect()
# Group management methods
def create_group(self, name, permissions=None):
"""Create a new user group"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
permissions_str = ','.join(permissions) if permissions else None
cursor.execute('''
INSERT INTO user_groups (name, permissions)
VALUES (?, ?)
''', (name, permissions_str))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating group: {e}")
return False
finally:
self.db_manager.disconnect()
def get_all_groups(self):
"""Get all user groups"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT id, name, permissions FROM user_groups')
groups = []
for row in cursor.fetchall():
permissions = row[2].split(',') if row[2] else []
groups.append({
'id': row[0],
'name': row[1],
'permissions': permissions
})
return groups
except Exception as e:
print(f"Error getting groups: {e}")
return []
finally:
self.db_manager.disconnect()
def delete_group(self, group_id):
"""Delete a user group"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
# Delete group memberships first
cursor.execute('DELETE FROM user_group_members WHERE group_id = ?', (group_id,))
# Delete group
cursor.execute('DELETE FROM user_groups WHERE id = ?', (group_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting group: {e}")
return False
finally:
self.db_manager.disconnect()
def add_user_to_group(self, user_id, group_id):
"""Add a user to a group"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT OR IGNORE INTO user_group_members (user_id, group_id)
VALUES (?, ?)
''', (user_id, group_id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error adding user to group: {e}")
return False
finally:
self.db_manager.disconnect()
def remove_user_from_group(self, user_id, group_id):
"""Remove a user from a group"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
DELETE FROM user_group_members
WHERE user_id = ? AND group_id = ?
''', (user_id, group_id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error removing user from group: {e}")
return False
finally:
self.db_manager.disconnect()
def get_user_groups(self, user_id):
"""Get all groups for a user"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
SELECT g.id, g.name, g.permissions
FROM user_groups g
JOIN user_group_members m ON g.id = m.group_id
WHERE m.user_id = ?
''', (user_id,))
groups = []
for row in cursor.fetchall():
permissions = row[2].split(',') if row[2] else []
groups.append({
'id': row[0],
'name': row[1],
'permissions': permissions
})
return groups
except Exception as e:
print(f"Error getting user groups: {e}")
return []
finally:
self.db_manager.disconnect()
def initialize_default_groups(self):
"""Initialize default groups with permissions"""
# Check if default groups already exist
groups = self.get_all_groups()
default_group_names = ['Purchase Manager', 'Manufacturing Manager', 'Sales Manager', 'Inventory Manager', 'Reports Viewer']
# Check if all default groups exist
existing_group_names = [group['name'] for group in groups]
if all(name in existing_group_names for name in default_group_names):
return True
# Create default groups
default_groups = [
{
'name': 'Purchase Manager',
'permissions': ['view_dashboard', 'view_purchase_orders', 'edit_purchase_orders', 'manage_suppliers', 'receive_purchase_orders']
},
{
'name': 'Manufacturing Manager',
'permissions': ['view_dashboard', 'view_manufacturing_orders', 'edit_manufacturing_orders', 'complete_manufacturing_orders']
},
{
'name': 'Sales Manager',
'permissions': ['view_dashboard', 'view_sales_orders', 'edit_sales_orders', 'manage_customers', 'deliver_sales_orders']
},
{
'name': 'Inventory Manager',
'permissions': ['view_dashboard', 'view_inventory', 'adjust_inventory', 'view_stock_movements']
},
{
'name': 'Reports Viewer',
'permissions': ['view_dashboard', 'view_reports', 'view_inventory_report', 'view_stock_movement_report']
}
]
for group in default_groups:
# Skip if group already exists
if group['name'] in existing_group_names:
continue
if not self.create_group(group['name'], group['permissions']):
return False
return True

901
src/dao.py Normal file
View File

@ -0,0 +1,901 @@
import sqlite3
from datetime import datetime, date
from typing import List, Optional
from src.models import Product, Supplier, Customer, PurchaseOrder, PurchaseOrderItem, ManufacturingOrder, SalesOrder, SalesOrderItem, Inventory
class BaseDAO:
"""Base DAO class with common database operations"""
def __init__(self, db_manager):
self.db_manager = db_manager
class ProductDAO(BaseDAO):
"""Data access object for Product model"""
def create(self, product: Product) -> bool:
"""Create a new product"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO products (name, description, unit_price, sku, min_stock)
VALUES (?, ?, ?, ?, ?)
''', (product.name, product.description, product.unit_price, product.sku, product.min_stock))
product.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating product: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, product_id: int) -> Optional[Product]:
"""Get product by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM products WHERE id = ?', (product_id,))
row = cursor.fetchone()
if row:
return Product(
id=row['id'],
name=row['name'],
description=row['description'],
unit_price=row['unit_price'],
sku=row['sku'],
min_stock=row['min_stock'] if 'min_stock' in row.keys() else 0
)
return None
except Exception as e:
print(f"Error getting product: {e}")
return None
finally:
self.db_manager.disconnect()
def get_all(self) -> List[Product]:
"""Get all products"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM products ORDER BY name')
rows = cursor.fetchall()
products = []
for row in rows:
products.append(Product(
id=row['id'],
name=row['name'],
description=row['description'],
unit_price=row['unit_price'],
sku=row['sku'],
min_stock=row['min_stock'] if 'min_stock' in row.keys() else 0
))
return products
except Exception as e:
print(f"Error getting products: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, product: Product) -> bool:
"""Update a product"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE products
SET name = ?, description = ?, unit_price = ?, sku = ?, min_stock = ?
WHERE id = ?
''', (product.name, product.description, product.unit_price, product.sku, product.min_stock, product.id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating product: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, product_id: int) -> bool:
"""Delete a product"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('DELETE FROM products WHERE id = ?', (product_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting product: {e}")
return False
finally:
self.db_manager.disconnect()
class SupplierDAO(BaseDAO):
"""Data access object for Supplier model"""
def create(self, supplier: Supplier) -> bool:
"""Create a new supplier"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO suppliers (name, contact_person, phone, email, address)
VALUES (?, ?, ?, ?, ?)
''', (supplier.name, supplier.contact_person, supplier.phone, supplier.email, supplier.address))
supplier.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating supplier: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, supplier_id: int) -> Optional[Supplier]:
"""Get supplier by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM suppliers WHERE id = ?', (supplier_id,))
row = cursor.fetchone()
if row:
return Supplier(
id=row['id'],
name=row['name'],
contact_person=row['contact_person'],
phone=row['phone'],
email=row['email'],
address=row['address']
)
return None
except Exception as e:
print(f"Error getting supplier: {e}")
return None
finally:
self.db_manager.disconnect()
def get_all(self) -> List[Supplier]:
"""Get all suppliers"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM suppliers ORDER BY name')
rows = cursor.fetchall()
suppliers = []
for row in rows:
suppliers.append(Supplier(
id=row['id'],
name=row['name'],
contact_person=row['contact_person'],
phone=row['phone'],
email=row['email'],
address=row['address']
))
return suppliers
except Exception as e:
print(f"Error getting suppliers: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, supplier: Supplier) -> bool:
"""Update a supplier"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE suppliers
SET name = ?, contact_person = ?, phone = ?, email = ?, address = ?
WHERE id = ?
''', (supplier.name, supplier.contact_person, supplier.phone, supplier.email, supplier.address, supplier.id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating supplier: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, supplier_id: int) -> bool:
"""Delete a supplier"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('DELETE FROM suppliers WHERE id = ?', (supplier_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting supplier: {e}")
return False
finally:
self.db_manager.disconnect()
class CustomerDAO(BaseDAO):
"""Data access object for Customer model"""
def create(self, customer: Customer) -> bool:
"""Create a new customer"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO customers (name, contact_person, phone, email, address)
VALUES (?, ?, ?, ?, ?)
''', (customer.name, customer.contact_person, customer.phone, customer.email, customer.address))
customer.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating customer: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, customer_id: int) -> Optional[Customer]:
"""Get customer by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM customers WHERE id = ?', (customer_id,))
row = cursor.fetchone()
if row:
return Customer(
id=row['id'],
name=row['name'],
contact_person=row['contact_person'],
phone=row['phone'],
email=row['email'],
address=row['address']
)
return None
except Exception as e:
print(f"Error getting customer: {e}")
return None
finally:
self.db_manager.disconnect()
def get_all(self) -> List[Customer]:
"""Get all customers"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM customers ORDER BY name')
rows = cursor.fetchall()
customers = []
for row in rows:
customers.append(Customer(
id=row['id'],
name=row['name'],
contact_person=row['contact_person'],
phone=row['phone'],
email=row['email'],
address=row['address']
))
return customers
except Exception as e:
print(f"Error getting customers: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, customer: Customer) -> bool:
"""Update a customer"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE customers
SET name = ?, contact_person = ?, phone = ?, email = ?, address = ?
WHERE id = ?
''', (customer.name, customer.contact_person, customer.phone, customer.email, customer.address, customer.id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating customer: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, customer_id: int) -> bool:
"""Delete a customer"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('DELETE FROM customers WHERE id = ?', (customer_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting customer: {e}")
return False
finally:
self.db_manager.disconnect()
class PurchaseOrderDAO(BaseDAO):
"""Data access object for PurchaseOrder model"""
def create(self, purchase_order: PurchaseOrder) -> bool:
"""Create a new purchase order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO purchase_orders (supplier_id, order_date, expected_delivery, status, total_amount)
VALUES (?, ?, ?, ?, ?)
''', (purchase_order.supplier_id,
purchase_order.order_date.isoformat() if purchase_order.order_date else None,
purchase_order.expected_delivery.isoformat() if purchase_order.expected_delivery else None,
purchase_order.status,
purchase_order.total_amount))
purchase_order.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating purchase order: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, po_id: int) -> Optional[PurchaseOrder]:
"""Get purchase order by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM purchase_orders WHERE id = ?', (po_id,))
row = cursor.fetchone()
if row:
order_date = datetime.fromisoformat(row['order_date']).date() if row['order_date'] else None
expected_delivery = datetime.fromisoformat(row['expected_delivery']).date() if row['expected_delivery'] else None
return PurchaseOrder(
id=row['id'],
supplier_id=row['supplier_id'],
order_date=order_date,
expected_delivery=expected_delivery,
status=row['status'],
total_amount=row['total_amount']
)
return None
except Exception as e:
print(f"Error getting purchase order: {e}")
return None
finally:
self.db_manager.disconnect()
def get_all(self) -> List[PurchaseOrder]:
"""Get all purchase orders"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM purchase_orders ORDER BY order_date DESC')
rows = cursor.fetchall()
purchase_orders = []
for row in rows:
order_date = datetime.fromisoformat(row['order_date']).date() if row['order_date'] else None
expected_delivery = datetime.fromisoformat(row['expected_delivery']).date() if row['expected_delivery'] else None
purchase_orders.append(PurchaseOrder(
id=row['id'],
supplier_id=row['supplier_id'],
order_date=order_date,
expected_delivery=expected_delivery,
status=row['status'],
total_amount=row['total_amount']
))
return purchase_orders
except Exception as e:
print(f"Error getting purchase orders: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, purchase_order: PurchaseOrder) -> bool:
"""Update a purchase order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE purchase_orders
SET supplier_id = ?, order_date = ?, expected_delivery = ?, status = ?, total_amount = ?
WHERE id = ?
''', (purchase_order.supplier_id,
purchase_order.order_date.isoformat() if purchase_order.order_date else None,
purchase_order.expected_delivery.isoformat() if purchase_order.expected_delivery else None,
purchase_order.status,
purchase_order.total_amount,
purchase_order.id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating purchase order: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, po_id: int) -> bool:
"""Delete a purchase order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
# Delete items first
cursor.execute('DELETE FROM purchase_order_items WHERE purchase_order_id = ?', (po_id,))
# Delete order
cursor.execute('DELETE FROM purchase_orders WHERE id = ?', (po_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting purchase order: {e}")
return False
finally:
self.db_manager.disconnect()
class ManufacturingOrderDAO(BaseDAO):
"""Data access object for ManufacturingOrder model"""
def create(self, manufacturing_order: ManufacturingOrder) -> bool:
"""Create a new manufacturing order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO manufacturing_orders (product_id, quantity, start_date, end_date, status)
VALUES (?, ?, ?, ?, ?)
''', (manufacturing_order.product_id,
manufacturing_order.quantity,
manufacturing_order.start_date.isoformat() if manufacturing_order.start_date else None,
manufacturing_order.end_date.isoformat() if manufacturing_order.end_date else None,
manufacturing_order.status))
manufacturing_order.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating manufacturing order: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, mo_id: int) -> Optional[ManufacturingOrder]:
"""Get manufacturing order by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM manufacturing_orders WHERE id = ?', (mo_id,))
row = cursor.fetchone()
if row:
start_date = datetime.fromisoformat(row['start_date']).date() if row['start_date'] else None
end_date = datetime.fromisoformat(row['end_date']).date() if row['end_date'] else None
return ManufacturingOrder(
id=row['id'],
product_id=row['product_id'],
quantity=row['quantity'],
start_date=start_date,
end_date=end_date,
status=row['status']
)
return None
except Exception as e:
print(f"Error getting manufacturing order: {e}")
return None
finally:
self.db_manager.disconnect()
def get_all(self) -> List[ManufacturingOrder]:
"""Get all manufacturing orders"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM manufacturing_orders ORDER BY start_date DESC')
rows = cursor.fetchall()
manufacturing_orders = []
for row in rows:
start_date = datetime.fromisoformat(row['start_date']).date() if row['start_date'] else None
end_date = datetime.fromisoformat(row['end_date']).date() if row['end_date'] else None
manufacturing_orders.append(ManufacturingOrder(
id=row['id'],
product_id=row['product_id'],
quantity=row['quantity'],
start_date=start_date,
end_date=end_date,
status=row['status']
))
return manufacturing_orders
except Exception as e:
print(f"Error getting manufacturing orders: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, manufacturing_order: ManufacturingOrder) -> bool:
"""Update a manufacturing order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE manufacturing_orders
SET product_id = ?, quantity = ?, start_date = ?, end_date = ?, status = ?
WHERE id = ?
''', (manufacturing_order.product_id,
manufacturing_order.quantity,
manufacturing_order.start_date.isoformat() if manufacturing_order.start_date else None,
manufacturing_order.end_date.isoformat() if manufacturing_order.end_date else None,
manufacturing_order.status,
manufacturing_order.id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating manufacturing order: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, mo_id: int) -> bool:
"""Delete a manufacturing order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('DELETE FROM manufacturing_orders WHERE id = ?', (mo_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting manufacturing order: {e}")
return False
finally:
self.db_manager.disconnect()
class SalesOrderDAO(BaseDAO):
"""Data access object for SalesOrder model"""
def create(self, sales_order: SalesOrder) -> bool:
"""Create a new sales order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO sales_orders (customer_id, order_date, expected_delivery, status, total_amount)
VALUES (?, ?, ?, ?, ?)
''', (sales_order.customer_id,
sales_order.order_date.isoformat() if sales_order.order_date else None,
sales_order.expected_delivery.isoformat() if sales_order.expected_delivery else None,
sales_order.status,
sales_order.total_amount))
sales_order.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating sales order: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, so_id: int) -> Optional[SalesOrder]:
"""Get sales order by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM sales_orders WHERE id = ?', (so_id,))
row = cursor.fetchone()
if row:
order_date = datetime.fromisoformat(row['order_date']).date() if row['order_date'] else None
expected_delivery = datetime.fromisoformat(row['expected_delivery']).date() if row['expected_delivery'] else None
return SalesOrder(
id=row['id'],
customer_id=row['customer_id'],
order_date=order_date,
expected_delivery=expected_delivery,
status=row['status'],
total_amount=row['total_amount']
)
return None
except Exception as e:
print(f"Error getting sales order: {e}")
return None
finally:
self.db_manager.disconnect()
def get_all(self) -> List[SalesOrder]:
"""Get all sales orders"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM sales_orders ORDER BY order_date DESC')
rows = cursor.fetchall()
sales_orders = []
for row in rows:
order_date = datetime.fromisoformat(row['order_date']).date() if row['order_date'] else None
expected_delivery = datetime.fromisoformat(row['expected_delivery']).date() if row['expected_delivery'] else None
sales_orders.append(SalesOrder(
id=row['id'],
customer_id=row['customer_id'],
order_date=order_date,
expected_delivery=expected_delivery,
status=row['status'],
total_amount=row['total_amount']
))
return sales_orders
except Exception as e:
print(f"Error getting sales orders: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, sales_order: SalesOrder) -> bool:
"""Update a sales order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE sales_orders
SET customer_id = ?, order_date = ?, expected_delivery = ?, status = ?, total_amount = ?
WHERE id = ?
''', (sales_order.customer_id,
sales_order.order_date.isoformat() if sales_order.order_date else None,
sales_order.expected_delivery.isoformat() if sales_order.expected_delivery else None,
sales_order.status,
sales_order.total_amount,
sales_order.id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating sales order: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, so_id: int) -> bool:
"""Delete a sales order"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
# Delete items first
cursor.execute('DELETE FROM sales_order_items WHERE sales_order_id = ?', (so_id,))
# Delete order
cursor.execute('DELETE FROM sales_orders WHERE id = ?', (so_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting sales order: {e}")
return False
finally:
self.db_manager.disconnect()
class InventoryDAO(BaseDAO):
"""Data access object for Inventory model"""
def create(self, inventory: Inventory) -> bool:
"""Create a new inventory record"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT OR IGNORE INTO inventory (product_id, quantity, reserved_quantity)
VALUES (?, ?, ?)
''', (inventory.product_id, inventory.quantity, inventory.reserved_quantity))
# If the record already exists, update it
cursor.execute('''
UPDATE inventory
SET quantity = ?, reserved_quantity = ?, last_updated = CURRENT_TIMESTAMP
WHERE product_id = ?
''', (inventory.quantity, inventory.reserved_quantity, inventory.product_id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating inventory record: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_product_id(self, product_id: int) -> Optional[Inventory]:
"""Get inventory by product ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM inventory WHERE product_id = ?', (product_id,))
row = cursor.fetchone()
if row:
last_updated = datetime.fromisoformat(row['last_updated']) if row['last_updated'] else None
return Inventory(
id=row['id'],
product_id=row['product_id'],
quantity=row['quantity'],
reserved_quantity=row['reserved_quantity']
)
return None
except Exception as e:
print(f"Error getting inventory: {e}")
return None
finally:
self.db_manager.disconnect()
def get_all(self) -> List[Inventory]:
"""Get all inventory records"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM inventory ORDER BY product_id')
rows = cursor.fetchall()
inventory_records = []
for row in rows:
last_updated = datetime.fromisoformat(row['last_updated']) if row['last_updated'] else None
inventory_records.append(Inventory(
id=row['id'],
product_id=row['product_id'],
quantity=row['quantity'],
reserved_quantity=row['reserved_quantity']
))
return inventory_records
except Exception as e:
print(f"Error getting inventory records: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, inventory: Inventory) -> bool:
"""Update an inventory record"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE inventory
SET quantity = ?, reserved_quantity = ?, last_updated = CURRENT_TIMESTAMP
WHERE product_id = ?
''', (inventory.quantity, inventory.reserved_quantity, inventory.product_id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating inventory: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, product_id: int) -> bool:
"""Delete an inventory record"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('DELETE FROM inventory WHERE product_id = ?', (product_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting inventory: {e}")
return False
finally:
self.db_manager.disconnect()
def adjust_quantity(self, product_id: int, quantity_change: int) -> bool:
"""Adjust inventory quantity"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE inventory
SET quantity = quantity + ?, last_updated = CURRENT_TIMESTAMP
WHERE product_id = ?
''', (quantity_change, product_id))
# If no rows were updated, insert a new record
if cursor.rowcount == 0:
cursor.execute('''
INSERT INTO inventory (product_id, quantity, reserved_quantity)
VALUES (?, ?, 0)
''', (product_id, quantity_change))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error adjusting inventory: {e}")
return False
finally:
self.db_manager.disconnect()

295
src/dao_items.py Normal file
View File

@ -0,0 +1,295 @@
import sqlite3
from datetime import datetime, date
from typing import List, Optional
from src.models import PurchaseOrderItem, SalesOrderItem
class BaseDAO:
"""Base DAO class with common database operations"""
def __init__(self, db_manager):
self.db_manager = db_manager
class PurchaseOrderItemDAO(BaseDAO):
"""Data access object for PurchaseOrderItem model"""
def create(self, item: PurchaseOrderItem) -> bool:
"""Create a new purchase order item"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO purchase_order_items (purchase_order_id, product_id, quantity, unit_price, total_price)
VALUES (?, ?, ?, ?, ?)
''', (item.purchase_order_id, item.product_id, item.quantity, item.unit_price, item.total_price))
item.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating purchase order item: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, item_id: int) -> Optional[PurchaseOrderItem]:
"""Get purchase order item by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM purchase_order_items WHERE id = ?', (item_id,))
row = cursor.fetchone()
if row:
return PurchaseOrderItem(
id=row['id'],
purchase_order_id=row['purchase_order_id'],
product_id=row['product_id'],
quantity=row['quantity'],
unit_price=row['unit_price'],
total_price=row['total_price']
)
return None
except Exception as e:
print(f"Error getting purchase order item: {e}")
return None
finally:
self.db_manager.disconnect()
def get_by_purchase_order_id(self, po_id: int) -> List[PurchaseOrderItem]:
"""Get all items for a purchase order"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM purchase_order_items WHERE purchase_order_id = ?', (po_id,))
rows = cursor.fetchall()
items = []
for row in rows:
items.append(PurchaseOrderItem(
id=row['id'],
purchase_order_id=row['purchase_order_id'],
product_id=row['product_id'],
quantity=row['quantity'],
unit_price=row['unit_price'],
total_price=row['total_price']
))
return items
except Exception as e:
print(f"Error getting purchase order items: {e}")
return []
finally:
self.db_manager.disconnect()
def get_all(self) -> List[PurchaseOrderItem]:
"""Get all purchase order items"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM purchase_order_items')
rows = cursor.fetchall()
items = []
for row in rows:
items.append(PurchaseOrderItem(
id=row['id'],
purchase_order_id=row['purchase_order_id'],
product_id=row['product_id'],
quantity=row['quantity'],
unit_price=row['unit_price'],
total_price=row['total_price']
))
return items
except Exception as e:
print(f"Error getting purchase order items: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, item: PurchaseOrderItem) -> bool:
"""Update a purchase order item"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE purchase_order_items
SET purchase_order_id = ?, product_id = ?, quantity = ?, unit_price = ?, total_price = ?
WHERE id = ?
''', (item.purchase_order_id, item.product_id, item.quantity, item.unit_price, item.total_price, item.id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating purchase order item: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, item_id: int) -> bool:
"""Delete a purchase order item"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('DELETE FROM purchase_order_items WHERE id = ?', (item_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting purchase order item: {e}")
return False
finally:
self.db_manager.disconnect()
class SalesOrderItemDAO(BaseDAO):
"""Data access object for SalesOrderItem model"""
def create(self, item: SalesOrderItem) -> bool:
"""Create a new sales order item"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO sales_order_items (sales_order_id, product_id, quantity, unit_price, total_price)
VALUES (?, ?, ?, ?, ?)
''', (item.sales_order_id, item.product_id, item.quantity, item.unit_price, item.total_price))
item.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating sales order item: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, item_id: int) -> Optional[SalesOrderItem]:
"""Get sales order item by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM sales_order_items WHERE id = ?', (item_id,))
row = cursor.fetchone()
if row:
return SalesOrderItem(
id=row['id'],
sales_order_id=row['sales_order_id'],
product_id=row['product_id'],
quantity=row['quantity'],
unit_price=row['unit_price'],
total_price=row['total_price']
)
return None
except Exception as e:
print(f"Error getting sales order item: {e}")
return None
finally:
self.db_manager.disconnect()
def get_by_sales_order_id(self, so_id: int) -> List[SalesOrderItem]:
"""Get all items for a sales order"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM sales_order_items WHERE sales_order_id = ?', (so_id,))
rows = cursor.fetchall()
items = []
for row in rows:
items.append(SalesOrderItem(
id=row['id'],
sales_order_id=row['sales_order_id'],
product_id=row['product_id'],
quantity=row['quantity'],
unit_price=row['unit_price'],
total_price=row['total_price']
))
return items
except Exception as e:
print(f"Error getting sales order items: {e}")
return []
finally:
self.db_manager.disconnect()
def get_all(self) -> List[SalesOrderItem]:
"""Get all sales order items"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM sales_order_items')
rows = cursor.fetchall()
items = []
for row in rows:
items.append(SalesOrderItem(
id=row['id'],
sales_order_id=row['sales_order_id'],
product_id=row['product_id'],
quantity=row['quantity'],
unit_price=row['unit_price'],
total_price=row['total_price']
))
return items
except Exception as e:
print(f"Error getting sales order items: {e}")
return []
finally:
self.db_manager.disconnect()
def update(self, item: SalesOrderItem) -> bool:
"""Update a sales order item"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
UPDATE sales_order_items
SET sales_order_id = ?, product_id = ?, quantity = ?, unit_price = ?, total_price = ?
WHERE id = ?
''', (item.sales_order_id, item.product_id, item.quantity, item.unit_price, item.total_price, item.id))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error updating sales order item: {e}")
return False
finally:
self.db_manager.disconnect()
def delete(self, item_id: int) -> bool:
"""Delete a sales order item"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('DELETE FROM sales_order_items WHERE id = ?', (item_id,))
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error deleting sales order item: {e}")
return False
finally:
self.db_manager.disconnect()

156
src/dao_stock.py Normal file
View File

@ -0,0 +1,156 @@
import sqlite3
from datetime import datetime, date
from typing import List, Optional
from src.models import StockMovement
class BaseDAO:
"""Base DAO class with common database operations"""
def __init__(self, db_manager):
self.db_manager = db_manager
class StockMovementDAO(BaseDAO):
"""Data access object for StockMovement model"""
def create(self, movement: StockMovement) -> bool:
"""Create a new stock movement record"""
if not self.db_manager.connect():
return False
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
INSERT INTO stock_movements (product_id, movement_type, quantity, reference_type, reference_id)
VALUES (?, ?, ?, ?, ?)
''', (movement.product_id, movement.movement_type, movement.quantity,
movement.reference_type, movement.reference_id))
movement.id = cursor.lastrowid
self.db_manager.connection.commit()
return True
except Exception as e:
print(f"Error creating stock movement: {e}")
return False
finally:
self.db_manager.disconnect()
def get_by_id(self, movement_id: int) -> Optional[StockMovement]:
"""Get stock movement by ID"""
if not self.db_manager.connect():
return None
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM stock_movements WHERE id = ?', (movement_id,))
row = cursor.fetchone()
if row:
created_at = datetime.fromisoformat(row['created_at']) if row['created_at'] else None
return StockMovement(
id=row['id'],
product_id=row['product_id'],
movement_type=row['movement_type'],
quantity=row['quantity'],
reference_type=row['reference_type'],
reference_id=row['reference_id']
)
return None
except Exception as e:
print(f"Error getting stock movement: {e}")
return None
finally:
self.db_manager.disconnect()
def get_by_product_id(self, product_id: int) -> List[StockMovement]:
"""Get all stock movements for a product"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM stock_movements WHERE product_id = ? ORDER BY created_at DESC', (product_id,))
rows = cursor.fetchall()
movements = []
for row in rows:
created_at = datetime.fromisoformat(row['created_at']) if row['created_at'] else None
movements.append(StockMovement(
id=row['id'],
product_id=row['product_id'],
movement_type=row['movement_type'],
quantity=row['quantity'],
reference_type=row['reference_type'],
reference_id=row['reference_id']
))
return movements
except Exception as e:
print(f"Error getting stock movements: {e}")
return []
finally:
self.db_manager.disconnect()
def get_all(self) -> List[StockMovement]:
"""Get all stock movements"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('SELECT * FROM stock_movements ORDER BY created_at DESC')
rows = cursor.fetchall()
movements = []
for row in rows:
created_at = datetime.fromisoformat(row['created_at']) if row['created_at'] else None
movements.append(StockMovement(
id=row['id'],
product_id=row['product_id'],
movement_type=row['movement_type'],
quantity=row['quantity'],
reference_type=row['reference_type'],
reference_id=row['reference_id']
))
return movements
except Exception as e:
print(f"Error getting stock movements: {e}")
return []
finally:
self.db_manager.disconnect()
def get_by_date_range(self, start_date: date, end_date: date) -> List[StockMovement]:
"""Get stock movements within a date range"""
if not self.db_manager.connect():
return []
try:
cursor = self.db_manager.connection.cursor()
cursor.execute('''
SELECT * FROM stock_movements
WHERE DATE(created_at) BETWEEN ? AND ?
ORDER BY created_at DESC
''', (start_date.isoformat(), end_date.isoformat()))
rows = cursor.fetchall()
movements = []
for row in rows:
created_at = datetime.fromisoformat(row['created_at']) if row['created_at'] else None
movements.append(StockMovement(
id=row['id'],
product_id=row['product_id'],
movement_type=row['movement_type'],
quantity=row['quantity'],
reference_type=row['reference_type'],
reference_id=row['reference_id']
))
return movements
except Exception as e:
print(f"Error getting stock movements by date range: {e}")
return []
finally:
self.db_manager.disconnect()

325
src/database.py Normal file
View File

@ -0,0 +1,325 @@
import sqlite3
import os
import json
from datetime import datetime
import shutil
class DatabaseManager:
"""Database manager for the manufacturing app"""
def __init__(self, db_path=None):
"""Initialize the database manager"""
self.db_path = db_path or "manufacturing.db"
self.config_path = "db_config.json"
self.connection = None
self.load_config()
def load_config(self):
"""Load database configuration"""
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
config = json.load(f)
self.db_type = config.get('db_type', 'sqlite')
self.db_host = config.get('db_host', '')
self.db_port = config.get('db_port', '')
self.db_name = config.get('db_name', '')
self.db_user = config.get('db_user', '')
self.db_password = config.get('db_password', '')
else:
# Default to SQLite
self.db_type = 'sqlite'
self.db_host = ''
self.db_port = ''
self.db_name = self.db_path
self.db_user = ''
self.db_password = ''
def save_config(self):
"""Save database configuration"""
config = {
'db_type': self.db_type,
'db_host': self.db_host,
'db_port': self.db_port,
'db_name': self.db_name,
'db_user': self.db_user,
'db_password': self.db_password
}
with open(self.config_path, 'w') as f:
json.dump(config, f, indent=2)
def connect(self):
"""Connect to the database"""
try:
if self.db_type == 'sqlite':
self.connection = sqlite3.connect(self.db_name)
self.connection.row_factory = sqlite3.Row
elif self.db_type == 'postgresql':
# For PostgreSQL, we would use psycopg2
# This is a placeholder implementation
import psycopg2
self.connection = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database=self.db_name,
user=self.db_user,
password=self.db_password
)
return True
except Exception as e:
print(f"Database connection error: {e}")
return False
def disconnect(self):
"""Disconnect from the database"""
if self.connection:
self.connection.close()
self.connection = None
def initialize_database(self):
"""Initialize the database with required tables"""
if not self.connect():
return False
try:
cursor = self.connection.cursor()
# Create users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
is_admin BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create user_groups table
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
permissions TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create user_group_members table
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_group_members (
user_id INTEGER,
group_id INTEGER,
FOREIGN KEY (user_id) REFERENCES users (id),
FOREIGN KEY (group_id) REFERENCES user_groups (id),
PRIMARY KEY (user_id, group_id)
)
''')
# Create products table
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
unit_price REAL DEFAULT 0,
sku TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create suppliers table
cursor.execute('''
CREATE TABLE IF NOT EXISTS suppliers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
contact_person TEXT,
phone TEXT,
email TEXT,
address TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create customers table
cursor.execute('''
CREATE TABLE IF NOT EXISTS customers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
contact_person TEXT,
phone TEXT,
email TEXT,
address TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create purchase_orders table
cursor.execute('''
CREATE TABLE IF NOT EXISTS purchase_orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
supplier_id INTEGER,
order_date DATE,
expected_delivery DATE,
status TEXT DEFAULT 'pending',
total_amount REAL DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (supplier_id) REFERENCES suppliers (id)
)
''')
# Create purchase_order_items table
cursor.execute('''
CREATE TABLE IF NOT EXISTS purchase_order_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
purchase_order_id INTEGER,
product_id INTEGER,
quantity INTEGER,
unit_price REAL,
total_price REAL,
FOREIGN KEY (purchase_order_id) REFERENCES purchase_orders (id),
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
# Create manufacturing_orders table
cursor.execute('''
CREATE TABLE IF NOT EXISTS manufacturing_orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER,
quantity INTEGER,
start_date DATE,
end_date DATE,
status TEXT DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
# Create sales_orders table
cursor.execute('''
CREATE TABLE IF NOT EXISTS sales_orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id INTEGER,
order_date DATE,
expected_delivery DATE,
status TEXT DEFAULT 'pending',
total_amount REAL DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (customer_id) REFERENCES customers (id)
)
''')
# Create sales_order_items table
cursor.execute('''
CREATE TABLE IF NOT EXISTS sales_order_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sales_order_id INTEGER,
product_id INTEGER,
quantity INTEGER,
unit_price REAL,
total_price REAL,
FOREIGN KEY (sales_order_id) REFERENCES sales_orders (id),
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
# Create inventory table
cursor.execute('''
CREATE TABLE IF NOT EXISTS inventory (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER UNIQUE,
quantity INTEGER DEFAULT 0,
reserved_quantity INTEGER DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
# Create stock_movements table
cursor.execute('''
CREATE TABLE IF NOT EXISTS stock_movements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER,
movement_type TEXT, -- 'IN' or 'OUT'
quantity INTEGER,
reference_type TEXT, -- 'PO', 'SO', 'MO', 'ADJUSTMENT'
reference_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
# Create stock movements table
cursor.execute('''
CREATE TABLE IF NOT EXISTS stock_movements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER,
movement_type TEXT, -- 'IN' for receipts, 'OUT' for issues
quantity INTEGER,
reference_type TEXT, -- 'PO', 'SO', 'MO', 'ADJUSTMENT'
reference_id INTEGER, -- ID of the related document
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
self.connection.commit()
return True
except Exception as e:
print(f"Database initialization error: {e}")
return False
finally:
self.disconnect()
def backup_database(self, backup_path=None):
"""Backup the database"""
try:
if not backup_path:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"backup_{timestamp}.db"
if self.db_type == 'sqlite':
shutil.copy2(self.db_name, backup_path)
return backup_path
else:
# For other databases, implement appropriate backup logic
return None
except Exception as e:
print(f"Database backup error: {e}")
return None
def restore_database(self, backup_path):
"""Restore the database from backup"""
try:
if self.db_type == 'sqlite':
shutil.copy2(backup_path, self.db_name)
return True
else:
# For other databases, implement appropriate restore logic
return False
except Exception as e:
print(f"Database restore error: {e}")
return False
def duplicate_database(self, new_name):
"""Duplicate the database"""
try:
if self.db_type == 'sqlite':
shutil.copy2(self.db_name, new_name)
return new_name
else:
# For other databases, implement appropriate duplication logic
return None
except Exception as e:
print(f"Database duplication error: {e}")
return None
def get_connection_string(self):
"""Get database connection string"""
if self.db_type == 'sqlite':
return f"sqlite:///{self.db_name}"
elif self.db_type == 'postgresql':
return f"postgresql://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}"
return ""

240
src/models.py Normal file
View File

@ -0,0 +1,240 @@
import sqlite3
from datetime import datetime, date
from typing import List, Optional
class Product:
"""Product model"""
def __init__(self, id=None, name="", description="", unit_price=0.0, sku=None, min_stock=0):
self.id = id
self.name = name
self.description = description
self.unit_price = unit_price
self.sku = sku
self.min_stock = min_stock
self.created_at = datetime.now()
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'description': self.description,
'unit_price': self.unit_price,
'sku': self.sku,
'min_stock': self.min_stock,
'created_at': self.created_at.isoformat() if self.created_at else None
}
class Supplier:
"""Supplier model"""
def __init__(self, id=None, name="", contact_person="", phone="", email="", address=""):
self.id = id
self.name = name
self.contact_person = contact_person
self.phone = phone
self.email = email
self.address = address
self.created_at = datetime.now()
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'contact_person': self.contact_person,
'phone': self.phone,
'email': self.email,
'address': self.address,
'created_at': self.created_at.isoformat() if self.created_at else None
}
class Customer:
"""Customer model"""
def __init__(self, id=None, name="", contact_person="", phone="", email="", address=""):
self.id = id
self.name = name
self.contact_person = contact_person
self.phone = phone
self.email = email
self.address = address
self.created_at = datetime.now()
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'contact_person': self.contact_person,
'phone': self.phone,
'email': self.email,
'address': self.address,
'created_at': self.created_at.isoformat() if self.created_at else None
}
class PurchaseOrder:
"""Purchase order model"""
def __init__(self, id=None, supplier_id=None, order_date=None, expected_delivery=None,
status="pending", total_amount=0.0):
self.id = id
self.supplier_id = supplier_id
self.order_date = order_date or date.today()
self.expected_delivery = expected_delivery
self.status = status
self.total_amount = total_amount
self.created_at = datetime.now()
self.items: List[PurchaseOrderItem] = []
def to_dict(self):
return {
'id': self.id,
'supplier_id': self.supplier_id,
'order_date': self.order_date.isoformat() if self.order_date else None,
'expected_delivery': self.expected_delivery.isoformat() if self.expected_delivery else None,
'status': self.status,
'total_amount': self.total_amount,
'created_at': self.created_at.isoformat() if self.created_at else None
}
class PurchaseOrderItem:
"""Purchase order item model"""
def __init__(self, id=None, purchase_order_id=None, product_id=None, quantity=0, unit_price=0.0, total_price=0.0):
self.id = id
self.purchase_order_id = purchase_order_id
self.product_id = product_id
self.quantity = quantity
self.unit_price = unit_price
self.total_price = total_price
def to_dict(self):
return {
'id': self.id,
'purchase_order_id': self.purchase_order_id,
'product_id': self.product_id,
'quantity': self.quantity,
'unit_price': self.unit_price,
'total_price': self.total_price
}
class ManufacturingOrder:
"""Manufacturing order model"""
def __init__(self, id=None, product_id=None, quantity=0, start_date=None, end_date=None, status="pending"):
self.id = id
self.product_id = product_id
self.quantity = quantity
self.start_date = start_date or date.today()
self.end_date = end_date
self.status = status
self.created_at = datetime.now()
def to_dict(self):
return {
'id': self.id,
'product_id': self.product_id,
'quantity': self.quantity,
'start_date': self.start_date.isoformat() if self.start_date else None,
'end_date': self.end_date.isoformat() if self.end_date else None,
'status': self.status,
'created_at': self.created_at.isoformat() if self.created_at else None
}
class SalesOrder:
"""Sales order model"""
def __init__(self, id=None, customer_id=None, order_date=None, expected_delivery=None,
status="pending", total_amount=0.0):
self.id = id
self.customer_id = customer_id
self.order_date = order_date or date.today()
self.expected_delivery = expected_delivery
self.status = status
self.total_amount = total_amount
self.created_at = datetime.now()
self.items: List[SalesOrderItem] = []
def to_dict(self):
return {
'id': self.id,
'customer_id': self.customer_id,
'order_date': self.order_date.isoformat() if self.order_date else None,
'expected_delivery': self.expected_delivery.isoformat() if self.expected_delivery else None,
'status': self.status,
'total_amount': self.total_amount,
'created_at': self.created_at.isoformat() if self.created_at else None
}
class SalesOrderItem:
"""Sales order item model"""
def __init__(self, id=None, sales_order_id=None, product_id=None, quantity=0, unit_price=0.0, total_price=0.0):
self.id = id
self.sales_order_id = sales_order_id
self.product_id = product_id
self.quantity = quantity
self.unit_price = unit_price
self.total_price = total_price
def to_dict(self):
return {
'id': self.id,
'sales_order_id': self.sales_order_id,
'product_id': self.product_id,
'quantity': self.quantity,
'unit_price': self.unit_price,
'total_price': self.total_price
}
class Inventory:
"""Inventory model"""
def __init__(self, id=None, product_id=None, quantity=0, reserved_quantity=0):
self.id = id
self.product_id = product_id
self.quantity = quantity
self.reserved_quantity = reserved_quantity
self.last_updated = datetime.now()
def to_dict(self):
return {
'id': self.id,
'product_id': self.product_id,
'quantity': self.quantity,
'reserved_quantity': self.reserved_quantity,
'last_updated': self.last_updated.isoformat() if self.last_updated else None
}
class StockMovement:
"""Stock movement model"""
def __init__(self, id=None, product_id=None, movement_type="", quantity=0,
reference_type="", reference_id=None):
self.id = id
self.product_id = product_id
self.movement_type = movement_type # 'IN' or 'OUT'
self.quantity = quantity
self.reference_type = reference_type # 'PO', 'SO', 'MO', 'ADJUSTMENT'
self.reference_id = reference_id
self.created_at = datetime.now()
def to_dict(self):
return {
'id': self.id,
'product_id': self.product_id,
'movement_type': self.movement_type,
'quantity': self.quantity,
'reference_type': self.reference_type,
'reference_id': self.reference_id,
'created_at': self.created_at.isoformat() if self.created_at else None
}

699
src/services.py Normal file
View File

@ -0,0 +1,699 @@
from typing import List, Optional
import pandas as pd
from datetime import datetime, date
import os
from src.models import Product, Supplier, Customer, PurchaseOrder, PurchaseOrderItem, ManufacturingOrder, SalesOrder, SalesOrderItem, Inventory, StockMovement
from src.dao import ProductDAO, SupplierDAO, CustomerDAO, PurchaseOrderDAO, ManufacturingOrderDAO, SalesOrderDAO, InventoryDAO
from src.dao_items import PurchaseOrderItemDAO, SalesOrderItemDAO
from src.dao_stock import StockMovementDAO
class ProductService:
"""Service for managing products"""
def __init__(self, db_manager):
self.dao = ProductDAO(db_manager)
def create_product(self, name: str, description: str = "", unit_price: float = 0.0) -> Optional[Product]:
"""Create a new product"""
product = Product(name=name, description=description, unit_price=unit_price)
if self.dao.create(product):
return product
return None
def get_product(self, product_id: int) -> Optional[Product]:
"""Get product by ID"""
return self.dao.get_by_id(product_id)
def get_all_products(self) -> List[Product]:
"""Get all products"""
return self.dao.get_all()
def update_product(self, product: Product) -> bool:
"""Update a product"""
return self.dao.update(product)
def delete_product(self, product_id: int) -> bool:
"""Delete a product"""
return self.dao.delete(product_id)
class SupplierService:
"""Service for managing suppliers"""
def __init__(self, db_manager):
self.dao = SupplierDAO(db_manager)
def create_supplier(self, name: str, contact_person: str = "", phone: str = "",
email: str = "", address: str = "") -> Optional[Supplier]:
"""Create a new supplier"""
supplier = Supplier(name=name, contact_person=contact_person, phone=phone,
email=email, address=address)
if self.dao.create(supplier):
return supplier
return None
def get_supplier(self, supplier_id: int) -> Optional[Supplier]:
"""Get supplier by ID"""
return self.dao.get_by_id(supplier_id)
def get_all_suppliers(self) -> List[Supplier]:
"""Get all suppliers"""
return self.dao.get_all()
def update_supplier(self, supplier: Supplier) -> bool:
"""Update a supplier"""
return self.dao.update(supplier)
def delete_supplier(self, supplier_id: int) -> bool:
"""Delete a supplier"""
return self.dao.delete(supplier_id)
class CustomerService:
"""Service for managing customers"""
def __init__(self, db_manager):
self.dao = CustomerDAO(db_manager)
def create_customer(self, name: str, contact_person: str = "", phone: str = "",
email: str = "", address: str = "") -> Optional[Customer]:
"""Create a new customer"""
customer = Customer(name=name, contact_person=contact_person, phone=phone,
email=email, address=address)
if self.dao.create(customer):
return customer
return None
def get_customer(self, customer_id: int) -> Optional[Customer]:
"""Get customer by ID"""
return self.dao.get_by_id(customer_id)
def get_all_customers(self) -> List[Customer]:
"""Get all customers"""
return self.dao.get_all()
def update_customer(self, customer: Customer) -> bool:
"""Update a customer"""
return self.dao.update(customer)
def delete_customer(self, customer_id: int) -> bool:
"""Delete a customer"""
return self.dao.delete(customer_id)
class PurchaseOrderService:
"""Service for managing purchase orders"""
def __init__(self, db_manager):
self.dao = PurchaseOrderDAO(db_manager)
self.item_dao = PurchaseOrderItemDAO(db_manager)
self.product_service = ProductService(db_manager)
self.supplier_service = SupplierService(db_manager)
self.inventory_service = InventoryService(db_manager)
def create_purchase_order(self, supplier_id: int, order_date=None, expected_delivery=None,
status: str = "pending", total_amount: float = 0.0) -> Optional[PurchaseOrder]:
"""Create a new purchase order"""
# Verify supplier exists
if not self.supplier_service.get_supplier(supplier_id):
return None
po = PurchaseOrder(supplier_id=supplier_id, order_date=order_date,
expected_delivery=expected_delivery, status=status,
total_amount=total_amount)
if self.dao.create(po):
return po
return None
def add_purchase_order_item(self, po_id: int, product_id: int, quantity: int,
unit_price: float, total_price: float) -> bool:
"""Add an item to a purchase order"""
# Verify purchase order exists
po = self.get_purchase_order(po_id)
if not po:
return False
# Verify product exists
if not self.product_service.get_product(product_id):
return False
# Create item
item = PurchaseOrderItem(
purchase_order_id=po_id,
product_id=product_id,
quantity=quantity,
unit_price=unit_price,
total_price=total_price
)
return self.item_dao.create(item)
def get_purchase_order(self, po_id: int) -> Optional[PurchaseOrder]:
"""Get purchase order by ID"""
return self.dao.get_by_id(po_id)
def get_all_purchase_orders(self) -> List[PurchaseOrder]:
"""Get all purchase orders"""
return self.dao.get_all()
def get_pending_purchase_orders(self) -> List[PurchaseOrder]:
"""Get all pending purchase orders"""
all_orders = self.get_all_purchase_orders()
return [order for order in all_orders if order.status == "pending"]
def get_recent_purchase_orders(self, limit: int = 5) -> List[PurchaseOrder]:
"""Get recent purchase orders"""
all_orders = self.get_all_purchase_orders()
# Sort by created_at in descending order and return the limit
sorted_orders = sorted(all_orders, key=lambda x: x.created_at, reverse=True)
return sorted_orders[:limit]
def update_purchase_order(self, po: PurchaseOrder) -> bool:
"""Update a purchase order"""
return self.dao.update(po)
def delete_purchase_order(self, po_id: int) -> bool:
"""Delete a purchase order"""
return self.dao.delete(po_id)
def receive_purchase_order(self, po_id: int) -> bool:
"""Receive products from a purchase order and update inventory"""
po = self.get_purchase_order(po_id)
if not po or po.status != "pending":
return False
# Get all items for this purchase order
items = self.item_dao.get_by_purchase_order_id(po_id)
# Update inventory for each item
for item in items:
if not self.inventory_service.adjust_inventory(item.product_id, item.quantity, "PO", po_id):
return False
# Update order status
po.status = "completed"
if self.update_purchase_order(po):
return True
return False
def get_purchase_order_cost(self, po_id: int) -> float:
"""Get total cost for a purchase order"""
po = self.get_purchase_order(po_id)
if not po:
return 0.0
return po.total_amount
def get_all_costs(self) -> float:
"""Get total costs from all completed purchase orders"""
total_cost = 0.0
purchase_orders = self.get_all_purchase_orders()
for po in purchase_orders:
if po.status == "completed":
total_cost += po.total_amount
return total_cost
def get_total_costs(self) -> float:
"""Get total costs from all completed purchase orders"""
return self.get_all_costs()
def export_to_excel(self, filename: str = None, start_date: date = None, end_date: date = None) -> str:
"""Export purchase orders to Excel with optional date range filtering"""
if filename is None:
filename = f"purchase_orders_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
purchase_orders = self.get_all_purchase_orders()
# Filter by date range if provided
if start_date and end_date:
purchase_orders = [po for po in purchase_orders if start_date <= po.order_date <= end_date]
elif start_date:
purchase_orders = [po for po in purchase_orders if po.order_date >= start_date]
elif end_date:
purchase_orders = [po for po in purchase_orders if po.order_date <= end_date]
# Prepare data for DataFrame
data = []
for po in purchase_orders:
supplier = self.supplier_service.get_supplier(po.supplier_id)
supplier_name = supplier.name if supplier else "Unknown"
data.append({
'Order ID': po.id,
'Supplier': supplier_name,
'Order Date': po.order_date,
'Expected Delivery': po.expected_delivery,
'Status': po.status,
'Total Amount': po.total_amount
})
# Create DataFrame and export
df = pd.DataFrame(data)
# Ensure directory exists
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
df.to_excel(filename, index=False, engine='openpyxl')
return filename
def export_purchase_report(self, start_date: date = None, end_date: date = None) -> str:
"""Export purchase report to Excel with optional date range"""
return self.export_to_excel(start_date=start_date, end_date=end_date)
class ManufacturingOrderService:
"""Service for managing manufacturing orders"""
def __init__(self, db_manager):
self.dao = ManufacturingOrderDAO(db_manager)
self.product_service = ProductService(db_manager)
self.inventory_service = InventoryService(db_manager)
def create_manufacturing_order(self, product_id: int, quantity: int, start_date=None,
end_date=None, status: str = "pending") -> Optional[ManufacturingOrder]:
"""Create a new manufacturing order"""
# Verify product exists
if not self.product_service.get_product(product_id):
return None
mo = ManufacturingOrder(product_id=product_id, quantity=quantity,
start_date=start_date, end_date=end_date, status=status)
if self.dao.create(mo):
return mo
return None
def get_manufacturing_order(self, mo_id: int) -> Optional[ManufacturingOrder]:
"""Get manufacturing order by ID"""
return self.dao.get_by_id(mo_id)
def get_all_manufacturing_orders(self) -> List[ManufacturingOrder]:
"""Get all manufacturing orders"""
return self.dao.get_all()
def get_active_manufacturing_orders(self) -> List[ManufacturingOrder]:
"""Get all active manufacturing orders"""
all_orders = self.get_all_manufacturing_orders()
return [order for order in all_orders if order.status == "pending"]
def get_recent_manufacturing_orders(self, limit: int = 5) -> List[ManufacturingOrder]:
"""Get recent manufacturing orders"""
all_orders = self.get_all_manufacturing_orders()
# Sort by created_at in descending order and return the limit
sorted_orders = sorted(all_orders, key=lambda x: x.created_at, reverse=True)
return sorted_orders[:limit]
def update_manufacturing_order(self, mo: ManufacturingOrder) -> bool:
"""Update a manufacturing order"""
return self.dao.update(mo)
def delete_manufacturing_order(self, mo_id: int) -> bool:
"""Delete a manufacturing order"""
return self.dao.delete(mo_id)
def complete_manufacturing_order(self, mo_id: int) -> bool:
"""Complete a manufacturing order and update inventory"""
mo = self.get_manufacturing_order(mo_id)
if not mo or mo.status != "pending":
return False
# Update inventory
if self.inventory_service.adjust_inventory(mo.product_id, mo.quantity, "MO", mo_id):
# Update order status
mo.status = "completed"
return self.update_manufacturing_order(mo)
return False
def export_to_excel(self, filename: str = None, start_date: date = None, end_date: date = None) -> str:
"""Export manufacturing orders to Excel with optional date range filtering"""
if filename is None:
filename = f"manufacturing_orders_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
manufacturing_orders = self.get_all_manufacturing_orders()
# Filter by date range if provided
if start_date and end_date:
manufacturing_orders = [mo for mo in manufacturing_orders if start_date <= mo.start_date <= end_date]
elif start_date:
manufacturing_orders = [mo for mo in manufacturing_orders if mo.start_date >= start_date]
elif end_date:
manufacturing_orders = [mo for mo in manufacturing_orders if mo.start_date <= end_date]
# Prepare data for DataFrame
data = []
for mo in manufacturing_orders:
product = self.product_service.get_product(mo.product_id)
product_name = product.name if product else "Unknown"
data.append({
'Order ID': mo.id,
'Product': product_name,
'Quantity': mo.quantity,
'Start Date': mo.start_date,
'End Date': mo.end_date,
'Status': mo.status
})
# Create DataFrame and export
df = pd.DataFrame(data)
# Ensure directory exists
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
df.to_excel(filename, index=False, engine='openpyxl')
return filename
def export_manufacturing_report(self, start_date: date = None, end_date: date = None) -> str:
"""Export manufacturing report to Excel with optional date range"""
return self.export_to_excel(start_date=start_date, end_date=end_date)
def get_total_costs(self) -> float:
"""Get total costs from all completed manufacturing orders"""
total_cost = 0.0
manufacturing_orders = self.get_all_manufacturing_orders()
for mo in manufacturing_orders:
if mo.status == "completed":
# Get the product to calculate cost
product = self.product_service.get_product(mo.product_id)
if product:
total_cost += product.unit_price * mo.quantity
return total_cost
class SalesOrderService:
"""Service for managing sales orders"""
def __init__(self, db_manager):
self.dao = SalesOrderDAO(db_manager)
self.item_dao = SalesOrderItemDAO(db_manager)
self.product_service = ProductService(db_manager)
self.customer_service = CustomerService(db_manager)
self.inventory_service = InventoryService(db_manager)
def create_sales_order(self, customer_id: int, order_date=None, expected_delivery=None,
status: str = "pending", total_amount: float = 0.0) -> Optional[SalesOrder]:
"""Create a new sales order"""
# Verify customer exists
if not self.customer_service.get_customer(customer_id):
return None
so = SalesOrder(customer_id=customer_id, order_date=order_date,
expected_delivery=expected_delivery, status=status,
total_amount=total_amount)
if self.dao.create(so):
return so
return None
def add_sales_order_item(self, so_id: int, product_id: int, quantity: int,
unit_price: float, total_price: float) -> bool:
"""Add an item to a sales order"""
# Verify sales order exists
so = self.get_sales_order(so_id)
if not so:
return False
# Verify product exists
if not self.product_service.get_product(product_id):
return False
# Create item
item = SalesOrderItem(
sales_order_id=so_id,
product_id=product_id,
quantity=quantity,
unit_price=unit_price,
total_price=total_price
)
return self.item_dao.create(item)
def get_sales_order(self, so_id: int) -> Optional[SalesOrder]:
"""Get sales order by ID"""
return self.dao.get_by_id(so_id)
def get_all_sales_orders(self) -> List[SalesOrder]:
"""Get all sales orders"""
return self.dao.get_all()
def get_pending_sales_orders(self) -> List[SalesOrder]:
"""Get all pending sales orders"""
all_orders = self.get_all_sales_orders()
return [order for order in all_orders if order.status == "pending"]
def get_recent_sales_orders(self, limit: int = 5) -> List[SalesOrder]:
"""Get recent sales orders"""
all_orders = self.get_all_sales_orders()
# Sort by created_at in descending order and return the limit
sorted_orders = sorted(all_orders, key=lambda x: x.created_at, reverse=True)
return sorted_orders[:limit]
def update_sales_order(self, so: SalesOrder) -> bool:
"""Update a sales order"""
return self.dao.update(so)
def delete_sales_order(self, so_id: int) -> bool:
"""Delete a sales order"""
return self.dao.delete(so_id)
def deliver_sales_order(self, so_id: int) -> bool:
"""Deliver products from a sales order and update inventory"""
so = self.get_sales_order(so_id)
if not so or so.status != "pending":
return False
# Get all items for this sales order
items = self.item_dao.get_by_sales_order_id(so_id)
# Update inventory for each item (decrease quantity)
for item in items:
if not self.inventory_service.adjust_inventory(item.product_id, -item.quantity, "SO", so_id):
return False
# Update order status
so.status = "completed"
if self.update_sales_order(so):
return True
return False
def get_sales_order_revenue(self, so_id: int) -> float:
"""Get total revenue for a sales order"""
so = self.get_sales_order(so_id)
if not so:
return 0.0
return so.total_amount
def get_all_revenue(self) -> float:
"""Get total revenue from all completed sales orders"""
total_revenue = 0.0
sales_orders = self.get_all_sales_orders()
for so in sales_orders:
if so.status == "completed":
total_revenue += so.total_amount
return total_revenue
def get_total_revenue(self) -> float:
"""Get total revenue from all completed sales orders"""
return self.get_all_revenue()
def export_to_excel(self, filename: str = None, start_date: date = None, end_date: date = None) -> str:
"""Export sales orders to Excel with optional date range filtering"""
if filename is None:
filename = f"sales_orders_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
sales_orders = self.get_all_sales_orders()
# Filter by date range if provided
if start_date and end_date:
sales_orders = [so for so in sales_orders if start_date <= so.order_date <= end_date]
elif start_date:
sales_orders = [so for so in sales_orders if so.order_date >= start_date]
elif end_date:
sales_orders = [so for so in sales_orders if so.order_date <= end_date]
# Prepare data for DataFrame
data = []
for so in sales_orders:
customer = self.customer_service.get_customer(so.customer_id)
customer_name = customer.name if customer else "Unknown"
data.append({
'Order ID': so.id,
'Customer': customer_name,
'Order Date': so.order_date,
'Expected Delivery': so.expected_delivery,
'Status': so.status,
'Total Amount': so.total_amount
})
# Create DataFrame and export
df = pd.DataFrame(data)
# Ensure directory exists
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
df.to_excel(filename, index=False, engine='openpyxl')
return filename
def export_sales_report(self, start_date: date = None, end_date: date = None) -> str:
"""Export sales report to Excel with optional date range"""
return self.export_to_excel(start_date=start_date, end_date=end_date)
class InventoryService:
"""Service for managing inventory"""
def __init__(self, db_manager):
self.dao = InventoryDAO(db_manager)
self.product_service = ProductService(db_manager)
self.movement_dao = StockMovementDAO(db_manager)
def create_inventory_record(self, product_id: int, quantity: int = 0,
reserved_quantity: int = 0) -> Optional[Inventory]:
"""Create a new inventory record"""
# Verify product exists
if not self.product_service.get_product(product_id):
return None
inventory = Inventory(product_id=product_id, quantity=quantity,
reserved_quantity=reserved_quantity)
if self.dao.create(inventory):
return inventory
return None
def get_inventory_by_product(self, product_id: int) -> Optional[Inventory]:
"""Get inventory by product ID"""
return self.dao.get_by_product_id(product_id)
def get_all_inventory(self) -> List[Inventory]:
"""Get all inventory records"""
return self.dao.get_all()
def update_inventory(self, inventory: Inventory) -> bool:
"""Update an inventory record"""
return self.dao.update(inventory)
def delete_inventory(self, product_id: int) -> bool:
"""Delete an inventory record"""
return self.dao.delete(product_id)
def adjust_inventory(self, product_id: int, quantity_change: int,
reference_type: str = "ADJUSTMENT", reference_id: int = None) -> bool:
"""Adjust inventory quantity and track movement"""
# Adjust inventory using DAO
if not self.dao.adjust_quantity(product_id, quantity_change):
return False
# Track stock movement
if quantity_change != 0:
movement_type = "IN" if quantity_change > 0 else "OUT"
movement = StockMovement(
product_id=product_id,
movement_type=movement_type,
quantity=abs(quantity_change),
reference_type=reference_type,
reference_id=reference_id
)
self.movement_dao.create(movement)
return True
def get_low_stock_products(self, threshold: int = 10) -> List[dict]:
"""Get products with low stock"""
inventory_records = self.get_all_inventory()
low_stock = []
for record in inventory_records:
if record.quantity <= threshold:
product = self.product_service.get_product(record.product_id)
if product:
low_stock.append({
'product': product,
'current_stock': record.quantity
})
return low_stock
def get_low_stock_items(self) -> List[dict]:
"""Get low stock items"""
return self.get_low_stock_products()
def get_current_stock(self, product_id: int) -> int:
"""Get current stock for a product"""
inventory = self.get_inventory_by_product(product_id)
return inventory.quantity if inventory else 0
def get_stock_movements(self, product_id: int = None) -> List[StockMovement]:
"""Get stock movements for a product or all products"""
if product_id:
return self.movement_dao.get_by_product_id(product_id)
else:
return self.movement_dao.get_all()
def export_to_excel(self, filename: str = None) -> str:
"""Export all inventory records to Excel"""
if filename is None:
filename = f"inventory_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
inventory_records = self.get_all_inventory()
# Prepare data for DataFrame
data = []
for record in inventory_records:
product = self.product_service.get_product(record.product_id)
product_name = product.name if product else "Unknown"
data.append({
'Product ID': record.product_id,
'Product Name': product_name,
'Current Stock': record.quantity,
'Reserved Quantity': record.reserved_quantity,
'Available Stock': record.quantity - record.reserved_quantity
})
# Create DataFrame and export
df = pd.DataFrame(data)
# Ensure directory exists
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
df.to_excel(filename, index=False, engine='openpyxl')
return filename
def export_stock_movement_report(self) -> str:
"""Export stock movement report to Excel"""
return self.export_stock_movements_to_excel()
def export_inventory_report(self) -> str:
"""Export inventory report to Excel"""
return self.export_to_excel()
def export_stock_movements_to_excel(self, filename: str = None) -> str:
"""Export all stock movements to Excel"""
if filename is None:
filename = f"stock_movements_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
movements = self.get_stock_movements()
# Prepare data for DataFrame
data = []
for movement in movements:
product = self.product_service.get_product(movement.product_id)
product_name = product.name if product else "Unknown"
data.append({
'Movement ID': movement.id,
'Product': product_name,
'Movement Type': movement.movement_type,
'Quantity': movement.quantity,
'Date': movement.created_at,
'Reference Type': movement.reference_type,
'Reference ID': movement.reference_id
})
# Create DataFrame and export
df = pd.DataFrame(data)
# Ensure directory exists
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
df.to_excel(filename, index=False, engine='openpyxl')
return filename

0
src/ui/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

108
src/ui/customer_dialog.py Normal file
View File

@ -0,0 +1,108 @@
import customtkinter as ctk
from tkinter import messagebox
from typing import Optional
from src.models import Customer
from src.services import CustomerService
class CustomerDialog(ctk.CTkToplevel):
def __init__(self, parent, customer_service: CustomerService,
customer: Optional[Customer] = None):
super().__init__(parent)
self.customer_service = customer_service
self.customer = customer
self.title("New Customer" if not customer else "Edit Customer")
self.geometry("400x350")
# Make dialog modal
self.transient(parent)
self.grab_set()
self.setup_ui()
def setup_ui(self):
# Name
ctk.CTkLabel(self, text="Name:").grid(row=0, column=0, padx=10, pady=5, sticky="w")
self.name_entry = ctk.CTkEntry(self, width=250)
self.name_entry.grid(row=0, column=1, padx=10, pady=5)
# Email
ctk.CTkLabel(self, text="Email:").grid(row=1, column=0, padx=10, pady=5, sticky="w")
self.email_entry = ctk.CTkEntry(self, width=250)
self.email_entry.grid(row=1, column=1, padx=10, pady=5)
# Phone
ctk.CTkLabel(self, text="Phone:").grid(row=2, column=0, padx=10, pady=5, sticky="w")
self.phone_entry = ctk.CTkEntry(self, width=250)
self.phone_entry.grid(row=2, column=1, padx=10, pady=5)
# Contact Person
ctk.CTkLabel(self, text="Contact Person:").grid(row=3, column=0, padx=10, pady=5, sticky="w")
self.contact_person_entry = ctk.CTkEntry(self, width=250)
self.contact_person_entry.grid(row=3, column=1, padx=10, pady=5)
# Address
ctk.CTkLabel(self, text="Address:").grid(row=4, column=0, padx=10, pady=5, sticky="nw")
self.address_text = ctk.CTkTextbox(self, height=80, width=250)
self.address_text.grid(row=4, column=1, padx=10, pady=5)
# Buttons
button_frame = ctk.CTkFrame(self)
button_frame.grid(row=5, column=0, columnspan=2, pady=20)
ctk.CTkButton(button_frame, text="Save", command=self.save_customer).pack(side="left", padx=5)
ctk.CTkButton(button_frame, text="Cancel", command=self.destroy).pack(side="left", padx=5)
# Load data if editing
if self.customer:
self.load_customer_data()
def load_customer_data(self):
self.name_entry.insert(0, self.customer.name)
self.email_entry.insert(0, self.customer.email or "")
self.phone_entry.insert(0, self.customer.phone or "")
self.contact_person_entry.insert(0, self.customer.contact_person or "")
self.address_text.insert("1.0", self.customer.address or "")
def save_customer(self):
name = self.name_entry.get().strip()
if not name:
messagebox.showerror("Error", "Please enter customer name")
return
email = self.email_entry.get().strip()
phone = self.phone_entry.get().strip()
contact_person = self.contact_person_entry.get().strip()
address = self.address_text.get("1.0", "end-1c").strip()
if self.customer:
# Update existing customer
updated = self.customer_service.update_customer(
customer_id=self.customer.id,
name=name,
contact_person=contact_person or None,
email=email or None,
phone=phone or None,
address=address or None
)
if updated:
messagebox.showinfo("Success", "Customer updated successfully")
self.destroy()
else:
messagebox.showerror("Error", "Failed to update customer")
else:
# Create new customer
customer = self.customer_service.create_customer(
name=name,
contact_person=contact_person or None,
email=email or None,
phone=phone or None,
address=address or None
)
if customer:
messagebox.showinfo("Success", f"Customer '{customer.name}' created successfully")
self.destroy()
else:
messagebox.showerror("Error", "Failed to create customer")

View File

@ -0,0 +1,148 @@
import customtkinter as ctk
from tkinter import messagebox
from datetime import datetime, date
from typing import Optional
class DatePickerDialog(ctk.CTkToplevel):
def __init__(self, parent, title="Select Date", initial_date: Optional[date] = None):
super().__init__(parent)
self.title(title)
self.geometry("300x200")
# Make dialog modal
self.transient(parent)
self.grab_set()
# Initialize variables
self.selected_date = None
# Set initial date or use today
if initial_date:
self.current_date = initial_date
else:
self.current_date = date.today()
self.setup_ui()
def setup_ui(self):
# Header with navigation
header_frame = ctk.CTkFrame(self)
header_frame.pack(fill="x", padx=10, pady=10)
# Previous month button
ctk.CTkButton(header_frame, text="<", width=30, height=30,
command=self.prev_month).pack(side="left", padx=5)
# Month/Year label
self.month_year_label = ctk.CTkLabel(header_frame, text="", font=("Arial", 14, "bold"))
self.month_year_label.pack(side="left", expand=True)
# Next month button
ctk.CTkButton(header_frame, text=">", width=30, height=30,
command=self.next_month).pack(side="right", padx=5)
# Calendar frame
self.calendar_frame = ctk.CTkFrame(self)
self.calendar_frame.pack(fill="both", expand=True, padx=10, pady=5)
# Buttons frame
button_frame = ctk.CTkFrame(self)
button_frame.pack(fill="x", padx=10, pady=10)
ctk.CTkButton(button_frame, text="Today", command=self.select_today).pack(side="left", padx=5)
ctk.CTkButton(button_frame, text="OK", command=self.ok).pack(side="right", padx=5)
ctk.CTkButton(button_frame, text="Cancel", command=self.cancel).pack(side="right", padx=5)
# Display initial calendar
self.display_calendar()
def display_calendar(self):
# Clear existing calendar
for widget in self.calendar_frame.winfo_children():
widget.destroy()
# Update month/year label
self.month_year_label.configure(text=f"{self.current_date.strftime('%B %Y')}")
# Create calendar grid
# Weekday headers
weekdays = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"]
for i, day in enumerate(weekdays):
ctk.CTkLabel(self.calendar_frame, text=day, width=30, height=25).grid(
row=0, column=i, padx=2, pady=2)
# Get first day of month and number of days
first_day = self.current_date.replace(day=1)
first_weekday = first_day.weekday() # 0=Monday, 6=Sunday
# Adjust for Sunday as first day (0=Sunday, 6=Saturday)
first_weekday = (first_weekday + 1) % 7
# Get number of days in month
if self.current_date.month == 12:
next_month = self.current_date.replace(year=self.current_date.year + 1, month=1, day=1)
else:
next_month = self.current_date.replace(month=self.current_date.month + 1, day=1)
days_in_month = (next_month - first_day).days
# Display days
day = 1
for week in range(6):
for weekday in range(7):
row = week + 1
if (week == 0 and weekday < first_weekday) or day > days_in_month:
# Empty cell
ctk.CTkLabel(self.calendar_frame, text="", width=30, height=25).grid(
row=row, column=weekday, padx=2, pady=2)
else:
# Day button
day_date = self.current_date.replace(day=day)
is_selected = (self.selected_date and day_date == self.selected_date)
is_today = (day_date == date.today())
btn = ctk.CTkButton(
self.calendar_frame,
text=str(day),
width=30,
height=25,
fg_color="green" if is_selected else ("#2CC985" if is_today else "transparent"),
text_color="white" if is_selected or is_today else "black",
hover_color="#2CC985",
command=lambda d=day_date: self.select_date(d)
)
btn.grid(row=row, column=weekday, padx=2, pady=2)
day += 1
def select_date(self, date_obj):
self.selected_date = date_obj
self.display_calendar()
def select_today(self):
self.selected_date = date.today()
self.current_date = self.selected_date
self.display_calendar()
def prev_month(self):
if self.current_date.month == 1:
self.current_date = self.current_date.replace(year=self.current_date.year - 1, month=12)
else:
self.current_date = self.current_date.replace(month=self.current_date.month - 1)
self.display_calendar()
def next_month(self):
if self.current_date.month == 12:
self.current_date = self.current_date.replace(year=self.current_date.year + 1, month=1)
else:
self.current_date = self.current_date.replace(month=self.current_date.month + 1)
self.display_calendar()
def ok(self):
if not self.selected_date:
self.selected_date = self.current_date
self.destroy()
def cancel(self):
self.selected_date = None
self.destroy()
def get_selected_date(self):
return self.selected_date

145
src/ui/date_range_dialog.py Normal file
View File

@ -0,0 +1,145 @@
import customtkinter as ctk
from tkinter import messagebox
from datetime import datetime, date
from typing import Optional, Tuple
class DateRangeDialog(ctk.CTkToplevel):
def __init__(self, parent, title="Select Date Range", ok_button_text="OK"):
super().__init__(parent)
self.title(title)
self.geometry("350x250")
# Make dialog modal
self.transient(parent)
self.grab_set()
# Initialize variables
self.start_date = date.today().replace(day=1) # First day of current month
self.end_date = date.today()
self.ok_button_text = ok_button_text
self.setup_ui()
def setup_ui(self):
# Title
ctk.CTkLabel(self, text="Select Date Range", font=("Arial", 16, "bold")).pack(pady=10)
# Start date
start_frame = ctk.CTkFrame(self)
start_frame.pack(fill="x", padx=20, pady=10)
ctk.CTkLabel(start_frame, text="Start Date:").pack(anchor="w")
date_frame1 = ctk.CTkFrame(start_frame)
date_frame1.pack(fill="x", pady=5)
self.start_date_entry = ctk.CTkEntry(date_frame1, width=150)
self.start_date_entry.insert(0, self.start_date.strftime("%Y-%m-%d"))
self.start_date_entry.pack(side="left")
ctk.CTkButton(date_frame1, text="...", width=30, command=self.select_start_date).pack(side="left", padx=(5, 0))
# End date
end_frame = ctk.CTkFrame(self)
end_frame.pack(fill="x", padx=20, pady=10)
ctk.CTkLabel(end_frame, text="End Date:").pack(anchor="w")
date_frame2 = ctk.CTkFrame(end_frame)
date_frame2.pack(fill="x", pady=5)
self.end_date_entry = ctk.CTkEntry(date_frame2, width=150)
self.end_date_entry.insert(0, self.end_date.strftime("%Y-%m-%d"))
self.end_date_entry.pack(side="left")
ctk.CTkButton(date_frame2, text="...", width=30, command=self.select_end_date).pack(side="left", padx=(5, 0))
# Buttons
button_frame = ctk.CTkFrame(self)
button_frame.pack(fill="x", padx=20, pady=20)
ctk.CTkButton(button_frame, text="This Month", command=self.set_this_month).pack(side="left", padx=5)
ctk.CTkButton(button_frame, text="Last Month", command=self.set_last_month).pack(side="left", padx=5)
ctk.CTkButton(button_frame, text=self.ok_button_text, command=self.ok).pack(side="right", padx=5)
ctk.CTkButton(button_frame, text="Cancel", command=self.cancel).pack(side="right", padx=5)
def select_start_date(self):
"""Open date picker for start date"""
try:
current_date = datetime.strptime(self.start_date_entry.get(), "%Y-%m-%d").date()
except ValueError:
current_date = date.today()
from src.ui.date_picker_dialog import DatePickerDialog
dialog = DatePickerDialog(self, "Select Start Date", current_date)
self.wait_window(dialog)
selected_date = dialog.get_selected_date()
if selected_date:
self.start_date_entry.delete(0, 'end')
self.start_date_entry.insert(0, selected_date.strftime("%Y-%m-%d"))
def select_end_date(self):
"""Open date picker for end date"""
try:
current_date = datetime.strptime(self.end_date_entry.get(), "%Y-%m-%d").date()
except ValueError:
current_date = date.today()
from src.ui.date_picker_dialog import DatePickerDialog
dialog = DatePickerDialog(self, "Select End Date", current_date)
self.wait_window(dialog)
selected_date = dialog.get_selected_date()
if selected_date:
self.end_date_entry.delete(0, 'end')
self.end_date_entry.insert(0, selected_date.strftime("%Y-%m-%d"))
def set_this_month(self):
"""Set date range to this month"""
today = date.today()
self.start_date = today.replace(day=1)
self.end_date = today
self.start_date_entry.delete(0, 'end')
self.start_date_entry.insert(0, self.start_date.strftime("%Y-%m-%d"))
self.end_date_entry.delete(0, 'end')
self.end_date_entry.insert(0, self.end_date.strftime("%Y-%m-%d"))
def set_last_month(self):
"""Set date range to last month"""
today = date.today()
if today.month == 1:
self.start_date = today.replace(year=today.year - 1, month=12, day=1)
self.end_date = today.replace(year=today.year - 1, month=12, day=31)
else:
self.start_date = today.replace(month=today.month - 1, day=1)
# Get last day of last month
if today.month == 1:
self.end_date = today.replace(year=today.year - 1, month=12, day=31)
else:
first_day_this_month = today.replace(day=1)
self.end_date = first_day_this_month - date.resolution
self.start_date_entry.delete(0, 'end')
self.start_date_entry.insert(0, self.start_date.strftime("%Y-%m-%d"))
self.end_date_entry.delete(0, 'end')
self.end_date_entry.insert(0, self.end_date.strftime("%Y-%m-%d"))
def ok(self):
"""Validate and close dialog"""
try:
self.start_date = datetime.strptime(self.start_date_entry.get(), "%Y-%m-%d").date()
self.end_date = datetime.strptime(self.end_date_entry.get(), "%Y-%m-%d").date()
if self.start_date > self.end_date:
messagebox.showerror("Error", "Start date must be before end date")
return
self.destroy()
except ValueError:
messagebox.showerror("Error", "Please enter valid dates in YYYY-MM-DD format")
def cancel(self):
"""Cancel and close dialog"""
self.start_date = None
self.end_date = None
self.destroy()
def get_date_range(self) -> Tuple[Optional[date], Optional[date]]:
"""Return the selected date range"""
return self.start_date, self.end_date

1218
src/ui/main_window.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,107 @@
import customtkinter as ctk
from tkinter import messagebox
from datetime import datetime
from typing import Optional, List
from src.models import ManufacturingOrder, Product
from src.services import ManufacturingOrderService, ProductService
class ManufacturingOrderDialog(ctk.CTkToplevel):
def __init__(self, parent, manufacturing_service: ManufacturingOrderService,
product_service: ProductService,
manufacturing_order: Optional[ManufacturingOrder] = None):
super().__init__(parent)
self.manufacturing_service = manufacturing_service
self.product_service = product_service
self.manufacturing_order = manufacturing_order
self.title("New Manufacturing Order" if not manufacturing_order else "Edit Manufacturing Order")
self.geometry("500x400")
# Make dialog modal
self.transient(parent)
self.grab_set()
self.setup_ui()
def setup_ui(self):
# Product selection
ctk.CTkLabel(self, text="Product:").grid(row=0, column=0, padx=10, pady=5, sticky="w")
self.product_var = ctk.StringVar()
self.product_combo = ctk.CTkComboBox(self, variable=self.product_var, width=200)
self.product_combo.grid(row=0, column=1, padx=10, pady=5)
# Quantity
ctk.CTkLabel(self, text="Quantity:").grid(row=1, column=0, padx=10, pady=5, sticky="w")
self.quantity_entry = ctk.CTkEntry(self, width=200)
self.quantity_entry.grid(row=1, column=1, padx=10, pady=5)
# Start date
ctk.CTkLabel(self, text="Start Date:").grid(row=2, column=0, padx=10, pady=5, sticky="w")
self.start_date_entry = ctk.CTkEntry(self, width=200)
self.start_date_entry.insert(0, datetime.now().strftime("%Y-%m-%d"))
self.start_date_entry.grid(row=2, column=1, padx=10, pady=5)
# Expected completion
ctk.CTkLabel(self, text="Expected Completion:").grid(row=3, column=0, padx=10, pady=5, sticky="w")
self.expected_completion_entry = ctk.CTkEntry(self, width=200)
self.expected_completion_entry.grid(row=3, column=1, padx=10, pady=5)
# Status
ctk.CTkLabel(self, text="Status:").grid(row=4, column=0, padx=10, pady=5, sticky="w")
self.status_var = ctk.StringVar(value="planned")
self.status_combo = ctk.CTkComboBox(self, variable=self.status_var,
values=["planned", "in_progress", "completed", "cancelled"])
self.status_combo.grid(row=4, column=1, padx=10, pady=5)
# Notes
ctk.CTkLabel(self, text="Notes:").grid(row=5, column=0, padx=10, pady=5, sticky="nw")
self.notes_text = ctk.CTkTextbox(self, height=100, width=200)
self.notes_text.grid(row=5, column=1, padx=10, pady=5)
# Buttons
button_frame = ctk.CTkFrame(self)
button_frame.grid(row=6, column=0, columnspan=2, pady=20)
ctk.CTkButton(button_frame, text="Save", command=self.save_order).pack(side="left", padx=5)
ctk.CTkButton(button_frame, text="Cancel", command=self.destroy).pack(side="left", padx=5)
# Load data
self.load_products()
def load_products(self):
products = self.product_service.get_all_products()
product_names = [p.name for p in products]
self.product_combo.configure(values=product_names)
def save_order(self):
product_name = self.product_var.get()
try:
quantity = int(self.quantity_entry.get())
except ValueError:
messagebox.showerror("Error", "Please enter a valid quantity")
return
if quantity <= 0:
messagebox.showerror("Error", "Quantity must be positive")
return
product = next((p for p in self.product_service.get_all_products() if p.name == product_name), None)
if not product:
messagebox.showerror("Error", "Please select a valid product")
return
# Create manufacturing order
mo = self.manufacturing_service.create_manufacturing_order(
product_id=product.id,
quantity=quantity,
start_date=self.start_date_entry.get(),
expected_completion=self.expected_completion_entry.get() or None,
status=self.status_var.get(),
notes=self.notes_text.get("1.0", "end-1c")
)
if mo:
messagebox.showinfo("Success", f"Manufacturing order #{mo.id} created successfully")
self.destroy()
else:
messagebox.showerror("Error", "Failed to create manufacturing order")

83
src/ui/product_dialog.py Normal file
View File

@ -0,0 +1,83 @@
import customtkinter as ctk
from tkinter import messagebox
from typing import Optional
from src.models import Product
from src.services import ProductService
class ProductDialog(ctk.CTkToplevel):
def __init__(self, parent, product_service: ProductService, product: Optional[Product] = None):
super().__init__(parent)
self.product_service = product_service
self.product = product
self.title("Add Product" if not product else "Edit Product")
self.geometry("400x300")
# Make dialog modal
self.transient(parent)
self.grab_set()
self.setup_ui()
def setup_ui(self):
# Name
ctk.CTkLabel(self, text="Name:").grid(row=0, column=0, padx=10, pady=5, sticky="w")
self.name_entry = ctk.CTkEntry(self, width=250)
self.name_entry.grid(row=0, column=1, padx=10, pady=5)
# Description
ctk.CTkLabel(self, text="Description:").grid(row=1, column=0, padx=10, pady=5, sticky="nw")
self.description_text = ctk.CTkTextbox(self, width=250, height=80)
self.description_text.grid(row=1, column=1, padx=10, pady=5)
# Unit Price
ctk.CTkLabel(self, text="Unit Price:").grid(row=2, column=0, padx=10, pady=5, sticky="w")
self.price_entry = ctk.CTkEntry(self, width=250)
self.price_entry.grid(row=2, column=1, padx=10, pady=5)
# Buttons
button_frame = ctk.CTkFrame(self)
button_frame.grid(row=3, column=0, columnspan=2, pady=20)
ctk.CTkButton(button_frame, text="Save", command=self.save_product).pack(side="left", padx=5)
ctk.CTkButton(button_frame, text="Cancel", command=self.destroy).pack(side="left", padx=5)
# Load data if editing
if self.product:
self.name_entry.insert(0, self.product.name)
self.description_text.insert("1.0", self.product.description)
self.price_entry.insert(0, str(self.product.unit_price))
def save_product(self):
name = self.name_entry.get().strip()
description = self.description_text.get("1.0", "end-1c").strip()
try:
unit_price = float(self.price_entry.get())
except ValueError:
messagebox.showerror("Error", "Please enter a valid price")
return
if not name:
messagebox.showerror("Error", "Please enter a product name")
return
if self.product:
# Update existing product
self.product.name = name
self.product.description = description
self.product.unit_price = unit_price
if self.product_service.update_product(self.product):
messagebox.showinfo("Success", "Product updated successfully")
self.destroy()
else:
messagebox.showerror("Error", "Failed to update product")
else:
# Create new product
product = self.product_service.create_product(name, description, unit_price)
if product:
messagebox.showinfo("Success", "Product created successfully")
self.destroy()
else:
messagebox.showerror("Error", "Failed to create product")

View File

@ -0,0 +1,233 @@
import customtkinter as ctk
from tkinter import messagebox
from datetime import datetime, date
from typing import Optional, List
from src.models import PurchaseOrder, PurchaseOrderItem, Product, Supplier
from src.services import PurchaseOrderService, ProductService, SupplierService
from src.ui.date_picker_dialog import DatePickerDialog
class PurchaseOrderDialog(ctk.CTkToplevel):
def __init__(self, parent, purchase_service: PurchaseOrderService,
product_service: ProductService, supplier_service: SupplierService,
purchase_order: Optional[PurchaseOrder] = None):
super().__init__(parent)
self.purchase_service = purchase_service
self.product_service = product_service
self.supplier_service = supplier_service
self.purchase_order = purchase_order
self.title("New Purchase Order" if not purchase_order else "Edit Purchase Order")
self.geometry("600x500")
# Make dialog modal
self.transient(parent)
self.grab_set()
self.items = []
self.setup_ui()
def setup_ui(self):
# Supplier selection
ctk.CTkLabel(self, text="Supplier:").grid(row=0, column=0, padx=10, pady=5, sticky="w")
self.supplier_var = ctk.StringVar()
self.supplier_combo = ctk.CTkComboBox(self, variable=self.supplier_var, width=200)
self.supplier_combo.grid(row=0, column=1, padx=10, pady=5)
# Order date
ctk.CTkLabel(self, text="Order Date:").grid(row=1, column=0, padx=10, pady=5, sticky="w")
date_frame1 = ctk.CTkFrame(self)
date_frame1.grid(row=1, column=1, padx=10, pady=5, sticky="w")
self.order_date_entry = ctk.CTkEntry(date_frame1, width=150)
self.order_date_entry.insert(0, datetime.now().strftime("%Y-%m-%d"))
self.order_date_entry.pack(side="left")
ctk.CTkButton(date_frame1, text="...", width=30, command=self.select_order_date).pack(side="left", padx=(5, 0))
# Expected delivery
ctk.CTkLabel(self, text="Expected Delivery:").grid(row=2, column=0, padx=10, pady=5, sticky="w")
date_frame2 = ctk.CTkFrame(self)
date_frame2.grid(row=2, column=1, padx=10, pady=5, sticky="w")
self.expected_delivery_entry = ctk.CTkEntry(date_frame2, width=150)
self.expected_delivery_entry.pack(side="left")
ctk.CTkButton(date_frame2, text="...", width=30, command=self.select_expected_delivery).pack(side="left", padx=(5, 0))
# Items frame
items_frame = ctk.CTkFrame(self)
items_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=10, sticky="nsew")
# Add item controls
add_frame = ctk.CTkFrame(items_frame)
add_frame.pack(fill="x", padx=5, pady=5)
ctk.CTkLabel(add_frame, text="Product:").pack(side="left", padx=5)
self.product_var = ctk.StringVar()
self.product_combo = ctk.CTkComboBox(add_frame, variable=self.product_var, width=150)
self.product_combo.pack(side="left", padx=5)
ctk.CTkLabel(add_frame, text="Qty:").pack(side="left", padx=5)
self.quantity_entry = ctk.CTkEntry(add_frame, width=50)
self.quantity_entry.pack(side="left", padx=5)
ctk.CTkLabel(add_frame, text="Price:").pack(side="left", padx=5)
self.price_entry = ctk.CTkEntry(add_frame, width=80)
self.price_entry.pack(side="left", padx=5)
ctk.CTkButton(add_frame, text="Add", command=self.add_item, width=60).pack(side="left", padx=5)
# Items list
self.items_text = ctk.CTkTextbox(items_frame, height=150)
self.items_text.pack(fill="both", expand=True, padx=5, pady=5)
# Total
self.total_label = ctk.CTkLabel(self, text="Total: Rp0", font=("Arial", 14, "bold"))
self.total_label.grid(row=4, column=0, columnspan=2, pady=10)
# Buttons
button_frame = ctk.CTkFrame(self)
button_frame.grid(row=5, column=0, columnspan=2, pady=10)
ctk.CTkButton(button_frame, text="Save", command=self.save_order).pack(side="left", padx=5)
ctk.CTkButton(button_frame, text="Cancel", command=self.destroy).pack(side="left", padx=5)
# Load data
self.load_suppliers()
self.load_products()
def load_suppliers(self):
suppliers = self.supplier_service.get_all_suppliers()
supplier_names = [s.name for s in suppliers]
self.supplier_combo.configure(values=supplier_names)
def load_products(self):
products = self.product_service.get_all_products()
product_names = [p.name for p in products]
self.product_combo.configure(values=product_names)
def add_item(self):
product_name = self.product_var.get()
try:
quantity = int(self.quantity_entry.get())
price = float(self.price_entry.get())
except ValueError:
messagebox.showerror("Error", "Please enter valid quantity and price")
return
if quantity <= 0 or price <= 0:
messagebox.showerror("Error", "Quantity and price must be positive")
return
product = next((p for p in self.product_service.get_all_products() if p.name == product_name), None)
if not product:
messagebox.showerror("Error", "Please select a valid product")
return
total = quantity * price
self.items.append({
'product': product,
'quantity': quantity,
'price': price,
'total': total
})
self.update_items_display()
self.update_total()
# Clear inputs
self.quantity_entry.delete(0, 'end')
self.price_entry.delete(0, 'end')
def update_items_display(self):
self.items_text.delete("1.0", "end")
for item in self.items:
self.items_text.insert("end",
f"{item['product'].name} - {item['quantity']} x Rp{item['price']:,.0f} = Rp{item['total']:,.0f}\n")
def update_total(self):
total = sum(item['total'] for item in self.items)
self.total_label.configure(text=f"Total: Rp{total:,.0f}")
def save_order(self):
supplier_name = self.supplier_var.get()
if not supplier_name:
messagebox.showerror("Error", "Please select a supplier")
return
if not self.items:
messagebox.showerror("Error", "Please add at least one item")
return
supplier = next((s for s in self.supplier_service.get_all_suppliers() if s.name == supplier_name), None)
if not supplier:
messagebox.showerror("Error", "Please select a valid supplier")
return
total = sum(item['total'] for item in self.items)
# Create purchase order
# Convert string dates to date objects
try:
order_date = datetime.strptime(self.order_date_entry.get(), "%Y-%m-%d").date()
except ValueError:
order_date = datetime.now().date()
expected_delivery = None
if self.expected_delivery_entry.get():
try:
expected_delivery = datetime.strptime(self.expected_delivery_entry.get(), "%Y-%m-%d").date()
except ValueError:
pass
po = self.purchase_service.create_purchase_order(
supplier_id=supplier.id,
order_date=order_date,
expected_delivery=expected_delivery,
total_amount=total
)
if po:
# Add items
for item in self.items:
self.purchase_service.add_purchase_order_item(
po_id=po.id,
product_id=item['product'].id,
quantity=item['quantity'],
unit_price=item['price'],
total_price=item['total']
)
messagebox.showinfo("Success", f"Purchase order #{po.id} created successfully")
self.destroy()
else:
messagebox.showerror("Error", "Failed to create purchase order")
def select_order_date(self):
"""Open date picker for order date"""
try:
current_date = datetime.strptime(self.order_date_entry.get(), "%Y-%m-%d").date()
except ValueError:
current_date = date.today()
dialog = DatePickerDialog(self, "Select Order Date", current_date)
self.wait_window(dialog)
selected_date = dialog.get_selected_date()
if selected_date:
self.order_date_entry.delete(0, 'end')
self.order_date_entry.insert(0, selected_date.strftime("%Y-%m-%d"))
def select_expected_delivery(self):
"""Open date picker for expected delivery date"""
if self.expected_delivery_entry.get():
try:
current_date = datetime.strptime(self.expected_delivery_entry.get(), "%Y-%m-%d").date()
except ValueError:
current_date = date.today()
else:
current_date = date.today()
dialog = DatePickerDialog(self, "Select Expected Delivery Date", current_date)
self.wait_window(dialog)
selected_date = dialog.get_selected_date()
if selected_date:
self.expected_delivery_entry.delete(0, 'end')
self.expected_delivery_entry.insert(0, selected_date.strftime("%Y-%m-%d"))

View File

@ -0,0 +1,233 @@
import customtkinter as ctk
from tkinter import messagebox
from datetime import datetime, date
from typing import Optional, List
from src.models import SalesOrder, SalesOrderItem, Product, Customer
from src.services import SalesOrderService, ProductService, CustomerService
from src.ui.date_picker_dialog import DatePickerDialog
class SalesOrderDialog(ctk.CTkToplevel):
def __init__(self, parent, sales_service: SalesOrderService,
product_service: ProductService, customer_service: CustomerService,
sales_order: Optional[SalesOrder] = None):
super().__init__(parent)
self.sales_service = sales_service
self.product_service = product_service
self.customer_service = customer_service
self.sales_order = sales_order
self.title("New Sales Order" if not sales_order else "Edit Sales Order")
self.geometry("600x500")
# Make dialog modal
self.transient(parent)
self.grab_set()
self.items = []
self.setup_ui()
def setup_ui(self):
# Customer selection
ctk.CTkLabel(self, text="Customer:").grid(row=0, column=0, padx=10, pady=5, sticky="w")
self.customer_var = ctk.StringVar()
self.customer_combo = ctk.CTkComboBox(self, variable=self.customer_var, width=200)
self.customer_combo.grid(row=0, column=1, padx=10, pady=5)
# Order date
ctk.CTkLabel(self, text="Order Date:").grid(row=1, column=0, padx=10, pady=5, sticky="w")
date_frame1 = ctk.CTkFrame(self)
date_frame1.grid(row=1, column=1, padx=10, pady=5, sticky="w")
self.order_date_entry = ctk.CTkEntry(date_frame1, width=150)
self.order_date_entry.insert(0, datetime.now().strftime("%Y-%m-%d"))
self.order_date_entry.pack(side="left")
ctk.CTkButton(date_frame1, text="...", width=30, command=self.select_order_date).pack(side="left", padx=(5, 0))
# Expected delivery
ctk.CTkLabel(self, text="Expected Delivery:").grid(row=2, column=0, padx=10, pady=5, sticky="w")
date_frame2 = ctk.CTkFrame(self)
date_frame2.grid(row=2, column=1, padx=10, pady=5, sticky="w")
self.expected_delivery_entry = ctk.CTkEntry(date_frame2, width=150)
self.expected_delivery_entry.pack(side="left")
ctk.CTkButton(date_frame2, text="...", width=30, command=self.select_expected_delivery).pack(side="left", padx=(5, 0))
# Items frame
items_frame = ctk.CTkFrame(self)
items_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=10, sticky="nsew")
# Add item controls
add_frame = ctk.CTkFrame(items_frame)
add_frame.pack(fill="x", padx=5, pady=5)
ctk.CTkLabel(add_frame, text="Product:").pack(side="left", padx=5)
self.product_var = ctk.StringVar()
self.product_combo = ctk.CTkComboBox(add_frame, variable=self.product_var, width=150)
self.product_combo.pack(side="left", padx=5)
ctk.CTkLabel(add_frame, text="Qty:").pack(side="left", padx=5)
self.quantity_entry = ctk.CTkEntry(add_frame, width=50)
self.quantity_entry.pack(side="left", padx=5)
ctk.CTkLabel(add_frame, text="Price:").pack(side="left", padx=5)
self.price_entry = ctk.CTkEntry(add_frame, width=80)
self.price_entry.pack(side="left", padx=5)
ctk.CTkButton(add_frame, text="Add", command=self.add_item, width=60).pack(side="left", padx=5)
# Items list
self.items_text = ctk.CTkTextbox(items_frame, height=150)
self.items_text.pack(fill="both", expand=True, padx=5, pady=5)
# Total
self.total_label = ctk.CTkLabel(self, text="Total: Rp0", font=("Arial", 14, "bold"))
self.total_label.grid(row=4, column=0, columnspan=2, pady=10)
# Buttons
button_frame = ctk.CTkFrame(self)
button_frame.grid(row=5, column=0, columnspan=2, pady=10)
ctk.CTkButton(button_frame, text="Save", command=self.save_order).pack(side="left", padx=5)
ctk.CTkButton(button_frame, text="Cancel", command=self.destroy).pack(side="left", padx=5)
# Load data
self.load_customers()
self.load_products()
def load_customers(self):
customers = self.customer_service.get_all_customers()
customer_names = [c.name for c in customers]
self.customer_combo.configure(values=customer_names)
def load_products(self):
products = self.product_service.get_all_products()
product_names = [p.name for p in products]
self.product_combo.configure(values=product_names)
def add_item(self):
product_name = self.product_var.get()
try:
quantity = int(self.quantity_entry.get())
price = float(self.price_entry.get())
except ValueError:
messagebox.showerror("Error", "Please enter valid quantity and price")
return
if quantity <= 0 or price <= 0:
messagebox.showerror("Error", "Quantity and price must be positive")
return
product = next((p for p in self.product_service.get_all_products() if p.name == product_name), None)
if not product:
messagebox.showerror("Error", "Please select a valid product")
return
total = quantity * price
self.items.append({
'product': product,
'quantity': quantity,
'price': price,
'total': total
})
self.update_items_display()
self.update_total()
# Clear inputs
self.quantity_entry.delete(0, 'end')
self.price_entry.delete(0, 'end')
def update_items_display(self):
self.items_text.delete("1.0", "end")
for item in self.items:
self.items_text.insert("end",
f"{item['product'].name} - {item['quantity']} x Rp{item['price']:,.0f} = Rp{item['total']:,.0f}\n")
def update_total(self):
total = sum(item['total'] for item in self.items)
self.total_label.configure(text=f"Total: Rp{total:,.0f}")
def save_order(self):
customer_name = self.customer_var.get()
if not customer_name:
messagebox.showerror("Error", "Please select a customer")
return
if not self.items:
messagebox.showerror("Error", "Please add at least one item")
return
customer = next((c for c in self.customer_service.get_all_customers() if c.name == customer_name), None)
if not customer:
messagebox.showerror("Error", "Please select a valid customer")
return
total = sum(item['total'] for item in self.items)
# Create sales order
# Convert string dates to date objects
try:
order_date = datetime.strptime(self.order_date_entry.get(), "%Y-%m-%d").date()
except ValueError:
order_date = datetime.now().date()
expected_delivery = None
if self.expected_delivery_entry.get():
try:
expected_delivery = datetime.strptime(self.expected_delivery_entry.get(), "%Y-%m-%d").date()
except ValueError:
pass
so = self.sales_service.create_sales_order(
customer_id=customer.id,
order_date=order_date,
expected_delivery=expected_delivery,
total_amount=total
)
if so:
# Add items
for item in self.items:
self.sales_service.add_sales_order_item(
so_id=so.id,
product_id=item['product'].id,
quantity=item['quantity'],
unit_price=item['price'],
total_price=item['total']
)
messagebox.showinfo("Success", f"Sales order #{so.id} created successfully")
self.destroy()
else:
messagebox.showerror("Error", "Failed to create sales order")
def select_order_date(self):
"""Open date picker for order date"""
try:
current_date = datetime.strptime(self.order_date_entry.get(), "%Y-%m-%d").date()
except ValueError:
current_date = date.today()
dialog = DatePickerDialog(self, "Select Order Date", current_date)
self.wait_window(dialog)
selected_date = dialog.get_selected_date()
if selected_date:
self.order_date_entry.delete(0, 'end')
self.order_date_entry.insert(0, selected_date.strftime("%Y-%m-%d"))
def select_expected_delivery(self):
"""Open date picker for expected delivery date"""
if self.expected_delivery_entry.get():
try:
current_date = datetime.strptime(self.expected_delivery_entry.get(), "%Y-%m-%d").date()
except ValueError:
current_date = date.today()
else:
current_date = date.today()
dialog = DatePickerDialog(self, "Select Expected Delivery Date", current_date)
self.wait_window(dialog)
selected_date = dialog.get_selected_date()
if selected_date:
self.expected_delivery_entry.delete(0, 'end')
self.expected_delivery_entry.insert(0, selected_date.strftime("%Y-%m-%d"))

108
src/ui/supplier_dialog.py Normal file
View File

@ -0,0 +1,108 @@
import customtkinter as ctk
from tkinter import messagebox
from typing import Optional
from src.models import Supplier
from src.services import SupplierService
class SupplierDialog(ctk.CTkToplevel):
def __init__(self, parent, supplier_service: SupplierService,
supplier: Optional[Supplier] = None):
super().__init__(parent)
self.supplier_service = supplier_service
self.supplier = supplier
self.title("New Supplier" if not supplier else "Edit Supplier")
self.geometry("400x350")
# Make dialog modal
self.transient(parent)
self.grab_set()
self.setup_ui()
def setup_ui(self):
# Name
ctk.CTkLabel(self, text="Name:").grid(row=0, column=0, padx=10, pady=5, sticky="w")
self.name_entry = ctk.CTkEntry(self, width=250)
self.name_entry.grid(row=0, column=1, padx=10, pady=5)
# Email
ctk.CTkLabel(self, text="Email:").grid(row=1, column=0, padx=10, pady=5, sticky="w")
self.email_entry = ctk.CTkEntry(self, width=250)
self.email_entry.grid(row=1, column=1, padx=10, pady=5)
# Phone
ctk.CTkLabel(self, text="Phone:").grid(row=2, column=0, padx=10, pady=5, sticky="w")
self.phone_entry = ctk.CTkEntry(self, width=250)
self.phone_entry.grid(row=2, column=1, padx=10, pady=5)
# Contact Person
ctk.CTkLabel(self, text="Contact Person:").grid(row=3, column=0, padx=10, pady=5, sticky="w")
self.contact_person_entry = ctk.CTkEntry(self, width=250)
self.contact_person_entry.grid(row=3, column=1, padx=10, pady=5)
# Address
ctk.CTkLabel(self, text="Address:").grid(row=4, column=0, padx=10, pady=5, sticky="nw")
self.address_text = ctk.CTkTextbox(self, height=80, width=250)
self.address_text.grid(row=4, column=1, padx=10, pady=5)
# Buttons
button_frame = ctk.CTkFrame(self)
button_frame.grid(row=5, column=0, columnspan=2, pady=20)
ctk.CTkButton(button_frame, text="Save", command=self.save_supplier).pack(side="left", padx=5)
ctk.CTkButton(button_frame, text="Cancel", command=self.destroy).pack(side="left", padx=5)
# Load data if editing
if self.supplier:
self.load_supplier_data()
def load_supplier_data(self):
self.name_entry.insert(0, self.supplier.name)
self.email_entry.insert(0, self.supplier.email or "")
self.phone_entry.insert(0, self.supplier.phone or "")
self.contact_person_entry.insert(0, self.supplier.contact_person or "")
self.address_text.insert("1.0", self.supplier.address or "")
def save_supplier(self):
name = self.name_entry.get().strip()
if not name:
messagebox.showerror("Error", "Please enter supplier name")
return
email = self.email_entry.get().strip()
phone = self.phone_entry.get().strip()
contact_person = self.contact_person_entry.get().strip()
address = self.address_text.get("1.0", "end-1c").strip()
if self.supplier:
# Update existing supplier
updated = self.supplier_service.update_supplier(
supplier_id=self.supplier.id,
name=name,
contact_person=contact_person or None,
email=email or None,
phone=phone or None,
address=address or None
)
if updated:
messagebox.showinfo("Success", "Supplier updated successfully")
self.destroy()
else:
messagebox.showerror("Error", "Failed to update supplier")
else:
# Create new supplier
supplier = self.supplier_service.create_supplier(
name=name,
contact_person=contact_person or None,
email=email or None,
phone=phone or None,
address=address or None
)
if supplier:
messagebox.showinfo("Success", f"Supplier '{supplier.name}' created successfully")
self.destroy()
else:
messagebox.showerror("Error", "Failed to create supplier")

View File

@ -0,0 +1,460 @@
import customtkinter as ctk
from tkinter import messagebox
from typing import Optional
from src.auth import AuthManager
class UserManagementDialog(ctk.CTkToplevel):
def __init__(self, parent, auth_manager: AuthManager):
super().__init__(parent)
self.auth_manager = auth_manager
self.selected_user = None
self.selected_groups = []
self.title("User Management")
self.geometry("800x600")
# Make dialog modal
self.transient(parent)
self.grab_set()
self.setup_ui()
self.load_users()
self.load_groups()
def setup_ui(self):
# Title
ctk.CTkLabel(self, text="User Management", font=("Arial", 16, "bold")).pack(pady=10)
# Main frame
main_frame = ctk.CTkFrame(self)
main_frame.pack(fill="both", expand=True, padx=20, pady=10)
# Users list frame
users_frame = ctk.CTkFrame(main_frame)
users_frame.pack(side="left", fill="both", expand=True, padx=(0, 10))
ctk.CTkLabel(users_frame, text="Users", font=("Arial", 14, "bold")).pack(pady=5)
# Users list
self.users_listbox = ctk.CTkScrollableFrame(users_frame, height=300)
self.users_listbox.pack(fill="both", expand=True, padx=10, pady=10)
# User buttons
user_buttons_frame = ctk.CTkFrame(users_frame)
user_buttons_frame.pack(fill="x", padx=10, pady=5)
ctk.CTkButton(user_buttons_frame, text="Add User", command=self.add_user).pack(side="left", padx=5)
ctk.CTkButton(user_buttons_frame, text="Edit User", command=self.edit_user).pack(side="left", padx=5)
ctk.CTkButton(user_buttons_frame, text="Delete User", command=self.delete_user).pack(side="left", padx=5)
# Groups list frame
groups_frame = ctk.CTkFrame(main_frame)
groups_frame.pack(side="right", fill="both", expand=True, padx=(10, 0))
ctk.CTkLabel(groups_frame, text="Groups", font=("Arial", 14, "bold")).pack(pady=5)
# Groups list
self.groups_listbox = ctk.CTkScrollableFrame(groups_frame, height=300)
self.groups_listbox.pack(fill="both", expand=True, padx=10, pady=10)
# Group buttons
group_buttons_frame = ctk.CTkFrame(groups_frame)
group_buttons_frame.pack(fill="x", padx=10, pady=5)
ctk.CTkButton(group_buttons_frame, text="Add Group", command=self.add_group).pack(side="left", padx=5)
ctk.CTkButton(group_buttons_frame, text="Edit Group", command=self.edit_group).pack(side="left", padx=5)
ctk.CTkButton(group_buttons_frame, text="Delete Group", command=self.delete_group).pack(side="left", padx=5)
# Assign/Unassign buttons
assign_frame = ctk.CTkFrame(self)
assign_frame.pack(fill="x", padx=20, pady=10)
ctk.CTkButton(assign_frame, text="Assign to Group", command=self.assign_to_group).pack(side="left", padx=5)
ctk.CTkButton(assign_frame, text="Unassign from Group", command=self.unassign_from_group).pack(side="left", padx=5)
# Close button
ctk.CTkButton(self, text="Close", command=self.destroy).pack(pady=10)
def load_users(self):
# Clear existing widgets
for widget in self.users_listbox.winfo_children():
widget.destroy()
users = self.auth_manager.get_all_users()
self.user_widgets = []
for user in users:
user_frame = ctk.CTkFrame(self.users_listbox)
user_frame.pack(fill="x", padx=5, pady=2)
# Get user groups
user_groups = self.auth_manager.get_user_groups(user['id'])
group_names = [group['name'] for group in user_groups]
groups_text = f" ({', '.join(group_names)})" if group_names else " (No groups)"
admin_text = " (Admin)" if user['is_admin'] else ""
user_label = ctk.CTkLabel(user_frame, text=f"{user['username']}{admin_text}{groups_text}", wraplength=250, anchor="w")
user_label.pack(side="left", padx=10, pady=5)
# Bind click event
user_frame.bind("<Button-1>", lambda e, u=user: self.select_user(u))
user_label.bind("<Button-1>", lambda e, u=user: self.select_user(u))
self.user_widgets.append((user_frame, user_label, user))
def load_groups(self):
# Clear existing widgets
for widget in self.groups_listbox.winfo_children():
widget.destroy()
groups = self.auth_manager.get_all_groups()
self.group_widgets = []
for group in groups:
group_frame = ctk.CTkFrame(self.groups_listbox)
group_frame.pack(fill="x", padx=5, pady=2)
group_label = ctk.CTkLabel(group_frame, text=group['name'])
group_label.pack(side="left", padx=10, pady=5)
# Bind click event
group_frame.bind("<Button-1>", lambda e, g=group: self.select_group(g))
group_label.bind("<Button-1>", lambda e, g=group: self.select_group(g))
self.group_widgets.append((group_frame, group_label, group))
def select_user(self, user):
self.selected_user = user
# Update visual selection
for widget_frame, widget_label, widget_user in self.user_widgets:
if widget_user == user:
widget_frame.configure(fg_color="blue")
widget_label.configure(text_color="white")
else:
widget_frame.configure(fg_color="transparent")
widget_label.configure(text_color="black")
def refresh_user_list(self):
"""Refresh the user list to show updated group memberships"""
self.load_users()
def select_group(self, group):
self.selected_group = group
# Update visual selection
for widget_frame, widget_label, widget_group in self.group_widgets:
if widget_group == group:
widget_frame.configure(fg_color="blue")
widget_label.configure(text_color="white")
else:
widget_frame.configure(fg_color="transparent")
widget_label.configure(text_color="black")
def add_user(self):
dialog = AddUserDialog(self, self.auth_manager)
self.wait_window(dialog)
self.load_users()
def edit_user(self):
if not self.selected_user:
messagebox.showerror("Error", "Please select a user to edit")
return
dialog = EditUserDialog(self, self.auth_manager, self.selected_user)
self.wait_window(dialog)
self.load_users()
def delete_user(self):
if not self.selected_user:
messagebox.showerror("Error", "Please select a user to delete")
return
if messagebox.askyesno("Confirm Delete", f"Are you sure you want to delete user '{self.selected_user['username']}'?"):
if self.auth_manager.delete_user(self.selected_user['id']):
messagebox.showinfo("Success", f"User '{self.selected_user['username']}' deleted successfully")
self.selected_user = None
self.load_users()
else:
messagebox.showerror("Error", "Failed to delete user")
def add_group(self):
dialog = AddGroupDialog(self, self.auth_manager)
self.wait_window(dialog)
self.load_groups()
def edit_group(self):
if not hasattr(self, 'selected_group'):
messagebox.showerror("Error", "Please select a group to edit")
return
dialog = EditGroupDialog(self, self.auth_manager, self.selected_group)
self.wait_window(dialog)
self.load_groups()
def delete_group(self):
if not hasattr(self, 'selected_group'):
messagebox.showerror("Error", "Please select a group to delete")
return
if messagebox.askyesno("Confirm Delete", f"Are you sure you want to delete group '{self.selected_group['name']}'?"):
if self.auth_manager.delete_group(self.selected_group['id']):
messagebox.showinfo("Success", f"Group '{self.selected_group['name']}' deleted successfully")
delattr(self, 'selected_group')
self.load_groups()
else:
messagebox.showerror("Error", "Failed to delete group")
def assign_to_group(self):
if not self.selected_user:
messagebox.showerror("Error", "Please select a user")
return
if not hasattr(self, 'selected_group'):
messagebox.showerror("Error", "Please select a group")
return
if self.auth_manager.add_user_to_group(self.selected_user['id'], self.selected_group['id']):
messagebox.showinfo("Success", f"User '{self.selected_user['username']}' assigned to group '{self.selected_group['name']}'")
self.refresh_user_list()
else:
messagebox.showerror("Error", "Failed to assign user to group")
def unassign_from_group(self):
if not self.selected_user:
messagebox.showerror("Error", "Please select a user")
return
if not hasattr(self, 'selected_group'):
messagebox.showerror("Error", "Please select a group")
return
if self.auth_manager.remove_user_from_group(self.selected_user['id'], self.selected_group['id']):
messagebox.showinfo("Success", f"User '{self.selected_user['username']}' unassigned from group '{self.selected_group['name']}'")
self.refresh_user_list()
else:
messagebox.showerror("Error", "Failed to unassign user from group")
class AddUserDialog(ctk.CTkToplevel):
def __init__(self, parent, auth_manager: AuthManager):
super().__init__(parent)
self.auth_manager = auth_manager
self.title("Add User")
self.geometry("400x300")
# Make dialog modal
self.transient(parent)
self.grab_set()
self.setup_ui()
def setup_ui(self):
# Title
ctk.CTkLabel(self, text="Add New User", font=("Arial", 16, "bold")).pack(pady=10)
# Username
ctk.CTkLabel(self, text="Username:").pack(anchor="w", padx=20, pady=(10, 0))
self.username_entry = ctk.CTkEntry(self, width=300)
self.username_entry.pack(pady=5)
# Password
ctk.CTkLabel(self, text="Password:").pack(anchor="w", padx=20, pady=(10, 0))
self.password_entry = ctk.CTkEntry(self, width=300, show="*")
self.password_entry.pack(pady=5)
# Confirm Password
ctk.CTkLabel(self, text="Confirm Password:").pack(anchor="w", padx=20, pady=(10, 0))
self.confirm_password_entry = ctk.CTkEntry(self, width=300, show="*")
self.confirm_password_entry.pack(pady=5)
# Admin checkbox
self.is_admin_var = ctk.BooleanVar()
ctk.CTkCheckBox(self, text="Admin User", variable=self.is_admin_var).pack(pady=10)
# Buttons
button_frame = ctk.CTkFrame(self)
button_frame.pack(pady=20)
ctk.CTkButton(button_frame, text="Save", command=self.save_user).pack(side="left", padx=5)
ctk.CTkButton(button_frame, text="Cancel", command=self.destroy).pack(side="left", padx=5)
def save_user(self):
username = self.username_entry.get().strip()
password = self.password_entry.get()
confirm_password = self.confirm_password_entry.get()
is_admin = self.is_admin_var.get()
if not username:
messagebox.showerror("Error", "Please enter a username")
return
if not password:
messagebox.showerror("Error", "Please enter a password")
return
if password != confirm_password:
messagebox.showerror("Error", "Passwords do not match")
return
try:
self.auth_manager.create_user(username, password, is_admin)
messagebox.showinfo("Success", f"User '{username}' created successfully")
self.destroy()
except Exception as e:
messagebox.showerror("Error", f"Failed to create user: {str(e)}")
class EditUserDialog(ctk.CTkToplevel):
def __init__(self, parent, auth_manager: AuthManager, user):
super().__init__(parent)
self.auth_manager = auth_manager
self.user = user
self.title("Edit User")
self.geometry("400x250")
# Make dialog modal
self.transient(parent)
self.grab_set()
self.setup_ui()
def setup_ui(self):
# Title
ctk.CTkLabel(self, text="Edit User", font=("Arial", 16, "bold")).pack(pady=10)
# Username
ctk.CTkLabel(self, text="Username:").pack(anchor="w", padx=20, pady=(10, 0))
self.username_entry = ctk.CTkEntry(self, width=300)
self.username_entry.insert(0, self.user['username'])
self.username_entry.pack(pady=5)
# Admin checkbox
self.is_admin_var = ctk.BooleanVar(value=self.user['is_admin'])
ctk.CTkCheckBox(self, text="Admin User", variable=self.is_admin_var).pack(pady=10)
# Buttons
button_frame = ctk.CTkFrame(self)
button_frame.pack(pady=20)
ctk.CTkButton(button_frame, text="Save", command=self.save_user).pack(side="left", padx=5)
ctk.CTkButton(button_frame, text="Cancel", command=self.destroy).pack(side="left", padx=5)
def save_user(self):
username = self.username_entry.get().strip()
is_admin = self.is_admin_var.get()
if not username:
messagebox.showerror("Error", "Please enter a username")
return
# Note: In a real application, you would update the user in the database
# For now, we'll just show a message
messagebox.showinfo("Info", "User update functionality would be implemented in a full application")
self.destroy()
class AddGroupDialog(ctk.CTkToplevel):
def __init__(self, parent, auth_manager: AuthManager):
super().__init__(parent)
self.auth_manager = auth_manager
self.title("Add Group")
self.geometry("400x300")
# Make dialog modal
self.transient(parent)
self.grab_set()
self.setup_ui()
def setup_ui(self):
# Title
ctk.CTkLabel(self, text="Add New Group", font=("Arial", 16, "bold")).pack(pady=10)
# Group name
ctk.CTkLabel(self, text="Group Name:").pack(anchor="w", padx=20, pady=(10, 0))
self.name_entry = ctk.CTkEntry(self, width=300)
self.name_entry.pack(pady=5)
# Permissions
ctk.CTkLabel(self, text="Permissions (comma separated):").pack(anchor="w", padx=20, pady=(10, 0))
self.permissions_entry = ctk.CTkEntry(self, width=300)
self.permissions_entry.pack(pady=5)
# Buttons
button_frame = ctk.CTkFrame(self)
button_frame.pack(pady=20)
ctk.CTkButton(button_frame, text="Save", command=self.save_group).pack(side="left", padx=5)
ctk.CTkButton(button_frame, text="Cancel", command=self.destroy).pack(side="left", padx=5)
def save_group(self):
name = self.name_entry.get().strip()
permissions = self.permissions_entry.get().strip()
if not name:
messagebox.showerror("Error", "Please enter a group name")
return
permissions_list = [p.strip() for p in permissions.split(",")] if permissions else []
if self.auth_manager.create_group(name, permissions_list):
messagebox.showinfo("Success", f"Group '{name}' created successfully")
self.destroy()
else:
messagebox.showerror("Error", "Failed to create group")
class EditGroupDialog(ctk.CTkToplevel):
def __init__(self, parent, auth_manager: AuthManager, group):
super().__init__(parent)
self.auth_manager = auth_manager
self.group = group
self.title("Edit Group")
self.geometry("400x300")
# Make dialog modal
self.transient(parent)
self.grab_set()
self.setup_ui()
def setup_ui(self):
# Title
ctk.CTkLabel(self, text="Edit Group", font=("Arial", 16, "bold")).pack(pady=10)
# Group name
ctk.CTkLabel(self, text="Group Name:").pack(anchor="w", padx=20, pady=(10, 0))
self.name_entry = ctk.CTkEntry(self, width=300)
self.name_entry.insert(0, self.group['name'])
self.name_entry.pack(pady=5)
# Permissions
ctk.CTkLabel(self, text="Permissions (comma separated):").pack(anchor="w", padx=20, pady=(10, 0))
permissions_str = ",".join(self.group['permissions']) if self.group['permissions'] else ""
self.permissions_entry = ctk.CTkEntry(self, width=300)
self.permissions_entry.insert(0, permissions_str)
self.permissions_entry.pack(pady=5)
# Buttons
button_frame = ctk.CTkFrame(self)
button_frame.pack(pady=20)
ctk.CTkButton(button_frame, text="Save", command=self.save_group).pack(side="left", padx=5)
ctk.CTkButton(button_frame, text="Cancel", command=self.destroy).pack(side="left", padx=5)
def save_group(self):
name = self.name_entry.get().strip()
permissions = self.permissions_entry.get().strip()
if not name:
messagebox.showerror("Error", "Please enter a group name")
return
# Note: In a real application, you would update the group in the database
# For now, we'll just show a message
messagebox.showinfo("Info", "Group update functionality would be implemented in a full application")
self.destroy()

119
test_export.py Normal file
View File

@ -0,0 +1,119 @@
#!/usr/bin/env python3
"""
Test script to verify Excel export functionality
"""
import os
import sys
from datetime import datetime
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from database import DatabaseManager
from services import (
ProductService, SupplierService, CustomerService,
PurchaseOrderService, ManufacturingOrderService,
SalesOrderService, InventoryService
)
def test_excel_exports():
"""Test all Excel export functionality"""
print("Testing Excel Export Functionality...")
# Initialize database
db_manager = DatabaseManager()
# Initialize services
product_service = ProductService(db_manager)
supplier_service = SupplierService(db_manager)
customer_service = CustomerService(db_manager)
purchase_service = PurchaseOrderService(db_manager)
manufacturing_service = ManufacturingOrderService(db_manager)
sales_service = SalesOrderService(db_manager)
inventory_service = InventoryService(db_manager)
# Create test data
print("Creating test data...")
# Create a product
product = product_service.create_product("Test Product", "Test Description", 100.0)
if product:
print(f"Created product: {product.name}")
# Create a supplier
supplier = supplier_service.create_supplier("Test Supplier", "John Doe", "1234567890")
if supplier:
print(f"Created supplier: {supplier.name}")
# Create a customer
customer = customer_service.create_customer("Test Customer", "Jane Doe", "0987654321")
if customer:
print(f"Created customer: {customer.name}")
# Create inventory record
inventory = inventory_service.create_inventory_record(product.id, 50)
if inventory:
print(f"Created inventory record for product {product.id}")
# Create purchase order
po = purchase_service.create_purchase_order(supplier.id)
if po:
print(f"Created purchase order: {po.id}")
# Create manufacturing order
mo = manufacturing_service.create_manufacturing_order(product.id, 10)
if mo:
print(f"Created manufacturing order: {mo.id}")
# Create sales order
so = sales_service.create_sales_order(customer.id)
if so:
print(f"Created sales order: {so.id}")
# Test exports
print("\nTesting Excel exports...")
try:
# Test purchase order export
po_filename = purchase_service.export_to_excel("test_purchase_orders.xlsx")
print(f"[OK] Purchase orders exported to: {po_filename}")
# Test manufacturing order export
mo_filename = manufacturing_service.export_to_excel("test_manufacturing_orders.xlsx")
print(f"[OK] Manufacturing orders exported to: {mo_filename}")
# Test sales order export
so_filename = sales_service.export_to_excel("test_sales_orders.xlsx")
print(f"[OK] Sales orders exported to: {so_filename}")
# Test inventory export
inv_filename = inventory_service.export_to_excel("test_inventory.xlsx")
print(f"[OK] Inventory exported to: {inv_filename}")
# Test stock movements export
sm_filename = inventory_service.export_stock_movements_to_excel("test_stock_movements.xlsx")
print(f"[OK] Stock movements exported to: {sm_filename}")
print("\n[SUCCESS] All Excel export tests passed!")
# Clean up test files
test_files = [po_filename, mo_filename, so_filename, inv_filename, sm_filename]
for file in test_files:
if os.path.exists(file):
os.remove(file)
print(f"Cleaned up: {file}")
except Exception as e:
print(f"[ERROR] Error during export: {str(e)}")
return False
return True
if __name__ == "__main__":
success = test_excel_exports()
if success:
print("\n[SUCCESS] All tests completed successfully!")
else:
print("\n[ERROR] Some tests failed!")
sys.exit(1)

107
test_ui_and_exports.py Normal file
View File

@ -0,0 +1,107 @@
#!/usr/bin/env python3
"""
Test script to verify the revamped UI and export features
"""
import os
import sys
import subprocess
import time
import threading
from pathlib import Path
def test_dependencies():
"""Test that all required dependencies are installed"""
print("Testing dependencies...")
try:
import customtkinter
print("[OK] customtkinter installed")
except ImportError:
print("[FAIL] customtkinter not installed")
return False
try:
import pandas
print("[OK] pandas installed")
except ImportError:
print("[FAIL] pandas not installed")
return False
try:
import openpyxl
print("[OK] openpyxl installed")
except ImportError:
print("[FAIL] openpyxl not installed")
return False
try:
import sqlalchemy
print("[OK] sqlalchemy installed")
except ImportError:
print("[FAIL] sqlalchemy not installed")
return False
try:
import bcrypt
print("[OK] bcrypt installed")
except ImportError:
print("[FAIL] bcrypt not installed")
return False
return True
def test_exports():
"""Test Excel export functionality"""
print("\nTesting Excel export functionality...")
# Import and run the existing test_export.py
try:
from test_export import test_excel_exports
test_excel_exports()
print("[OK] All Excel export tests passed")
return True
except Exception as e:
print(f"[FAIL] Excel export tests failed: {e}")
return False
def test_ui_startup():
"""Test that the UI can start without errors"""
print("\nTesting UI startup...")
try:
# Test that we can import the main modules
from src.app import ManufacturingApp
from src.ui.main_window import MainWindow
print("[OK] All UI modules import successfully")
# Test database initialization
from src.database import DatabaseManager
db_manager = DatabaseManager()
db_manager.initialize_database()
print("[OK] Database initialization successful")
return True
except Exception as e:
print(f"✗ UI startup test failed: {e}")
return False
def main():
"""Run all tests"""
print("=== Testing Revamped Manufacturing App ===\n")
success = True
success &= test_dependencies()
success &= test_ui_startup()
success &= test_exports()
if success:
print("\n[SUCCESS] All tests passed! The revamped UI and export features are working correctly.")
else:
print("\n[ERROR] Some tests failed. Please check the output above.")
return success
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)