402 lines
15 KiB
Python
402 lines
15 KiB
Python
from django.shortcuts import render, redirect
|
|
from django.contrib.auth.decorators import login_required, user_passes_test
|
|
from django.contrib import messages
|
|
from django.http import JsonResponse
|
|
from django.views.generic import TemplateView
|
|
from django.db import connection
|
|
from django.conf import settings
|
|
import sqlite3
|
|
import os
|
|
import shutil
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from django.utils import timezone
|
|
from django.db.models import Sum, Count, Avg
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
|
|
from inventory.models import Product, StockMovement
|
|
from purchase.models import PurchaseOrder, PurchaseOrderItem
|
|
from manufacture.models import ManufacturingOrder
|
|
from sales.models import SaleOrder, SaleOrderItem
|
|
|
|
def is_admin(user):
|
|
return user.is_superuser
|
|
|
|
class DashboardView(TemplateView):
|
|
template_name = 'core/dashboard.html'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
|
|
# Get current date and calculate date ranges
|
|
today = timezone.now().date()
|
|
week_ago = today - timedelta(days=7)
|
|
month_ago = today - timedelta(days=30)
|
|
|
|
# Production data
|
|
context['daily_production'] = ManufacturingOrder.objects.filter(
|
|
date=today
|
|
).aggregate(
|
|
total_quantity=Sum('quantity'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
context['weekly_production'] = ManufacturingOrder.objects.filter(
|
|
date__gte=week_ago
|
|
).aggregate(
|
|
total_quantity=Sum('quantity'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
context['monthly_production'] = ManufacturingOrder.objects.filter(
|
|
date__gte=month_ago
|
|
).aggregate(
|
|
total_quantity=Sum('quantity'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
# Sales data
|
|
context['daily_sales'] = SaleOrder.objects.filter(
|
|
date=today
|
|
).aggregate(
|
|
total_amount=Sum('total_amount'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
context['weekly_sales'] = SaleOrder.objects.filter(
|
|
date__gte=week_ago
|
|
).aggregate(
|
|
total_amount=Sum('total_amount'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
context['monthly_sales'] = SaleOrder.objects.filter(
|
|
date__gte=month_ago
|
|
).aggregate(
|
|
total_amount=Sum('total_amount'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
# Purchase data
|
|
context['daily_purchases'] = PurchaseOrder.objects.filter(
|
|
date=today
|
|
).aggregate(
|
|
total_amount=Sum('total_amount'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
context['weekly_purchases'] = PurchaseOrder.objects.filter(
|
|
date__gte=week_ago
|
|
).aggregate(
|
|
total_amount=Sum('total_amount'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
context['monthly_purchases'] = PurchaseOrder.objects.filter(
|
|
date__gte=month_ago
|
|
).aggregate(
|
|
total_amount=Sum('total_amount'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
# Inventory turnover (last 30 days)
|
|
context['inventory_turnover'] = StockMovement.objects.filter(
|
|
date__gte=month_ago
|
|
).aggregate(
|
|
total_movements=Count('id'),
|
|
total_quantity=Sum('quantity')
|
|
)
|
|
|
|
# Recent inventory movements
|
|
context['recent_inventory_movements'] = StockMovement.objects.select_related('product').order_by('-date')[:10]
|
|
|
|
# Inventory movement by type
|
|
context['inventory_movement_by_type'] = StockMovement.objects.values('movement_type').annotate(
|
|
count=Count('id'),
|
|
total_quantity=Sum('quantity')
|
|
).order_by('-count')
|
|
|
|
# Profit/Loss calculation (simplified)
|
|
total_sales = context['monthly_sales']['total_amount'] or Decimal('0')
|
|
total_purchases = context['monthly_purchases']['total_amount'] or Decimal('0')
|
|
context['monthly_profit'] = total_sales - total_purchases
|
|
|
|
# Recent activities
|
|
context['recent_manufacturing'] = ManufacturingOrder.objects.order_by('-date')[:5]
|
|
context['recent_sales'] = SaleOrder.objects.order_by('-date')[:5]
|
|
context['recent_purchases'] = PurchaseOrder.objects.order_by('-date')[:5]
|
|
|
|
# Inventory status counts
|
|
context['inventory_status_counts'] = {
|
|
'low_stock': Product.objects.filter(current_stock__lte=10).count(),
|
|
'normal': Product.objects.filter(current_stock__gt=10, current_stock__lt=100).count(),
|
|
'overstocked': Product.objects.filter(current_stock__gte=100).count(),
|
|
}
|
|
|
|
# Low stock alerts
|
|
context['low_stock_products'] = Product.objects.filter(
|
|
current_stock__lte=10
|
|
)[:5]
|
|
|
|
return context
|
|
|
|
class DatabaseManagementView(TemplateView):
|
|
template_name = 'core/database_management.html'
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
if not is_admin(request.user):
|
|
messages.error(request, 'Access denied. Admin privileges required.')
|
|
return redirect('core:dashboard')
|
|
return super().dispatch(request, *args, **kwargs)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
|
|
# Database file information
|
|
# Use the same path logic as in settings.py
|
|
if getattr(sys, 'frozen', False):
|
|
# Running as compiled executable
|
|
base_db_dir = Path(os.path.dirname(sys.executable))
|
|
else:
|
|
# Running as script
|
|
base_db_dir = settings.BASE_DIR
|
|
db_path = base_db_dir / 'db.sqlite3'
|
|
if os.path.exists(db_path):
|
|
stat = os.stat(db_path)
|
|
context['db_size'] = f"{stat.st_size / (1024*1024):.2f} MB"
|
|
context['db_modified'] = datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
# Available backups
|
|
# Use the same path logic as in settings.py
|
|
if getattr(sys, 'frozen', False):
|
|
# Running as compiled executable
|
|
base_db_dir = Path(os.path.dirname(sys.executable))
|
|
else:
|
|
# Running as script
|
|
base_db_dir = settings.BASE_DIR
|
|
backup_dir = base_db_dir / 'backups'
|
|
if os.path.exists(backup_dir):
|
|
backups = []
|
|
for file in os.listdir(backup_dir):
|
|
if file.endswith('.sqlite3'):
|
|
file_path = backup_dir / file
|
|
stat = os.stat(file_path)
|
|
backups.append({
|
|
'name': file,
|
|
'size': f"{stat.st_size / 1024:.2f} KB",
|
|
'modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M')
|
|
})
|
|
context['available_backups'] = sorted(backups, key=lambda x: x['modified'], reverse=True)
|
|
|
|
# Database statistics
|
|
try:
|
|
from django.contrib.auth import get_user_model
|
|
from inventory.models import Product, Customer
|
|
from purchase.models import PurchaseOrder
|
|
from sales.models import SaleOrder
|
|
|
|
User = get_user_model()
|
|
|
|
context['total_users'] = User.objects.count()
|
|
context['total_products'] = Product.objects.count()
|
|
context['total_orders'] = PurchaseOrder.objects.count() + SaleOrder.objects.count()
|
|
context['total_customers'] = Customer.objects.count()
|
|
except Exception as e:
|
|
# If there's an error, set default values
|
|
context['total_users'] = 0
|
|
context['total_products'] = 0
|
|
context['total_orders'] = 0
|
|
context['total_customers'] = 0
|
|
|
|
return context
|
|
|
|
@login_required
|
|
@user_passes_test(is_admin)
|
|
def backup_database(request):
|
|
if request.method == 'POST':
|
|
try:
|
|
# Use the same path logic as in settings.py
|
|
if getattr(sys, 'frozen', False):
|
|
# Running as compiled executable
|
|
base_db_dir = Path(os.path.dirname(sys.executable))
|
|
else:
|
|
# Running as script
|
|
base_db_dir = settings.BASE_DIR
|
|
|
|
# Create backup directory if it doesn't exist
|
|
backup_dir = base_db_dir / 'backups'
|
|
os.makedirs(backup_dir, exist_ok=True)
|
|
|
|
# Generate backup filename with timestamp
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
backup_filename = f'manufacture_app_backup_{timestamp}.sqlite3'
|
|
backup_path = backup_dir / backup_filename
|
|
|
|
# Copy the database file
|
|
db_path = base_db_dir / 'db.sqlite3'
|
|
shutil.copy2(db_path, backup_path)
|
|
|
|
messages.success(request, f'Database backed up successfully to {backup_filename}')
|
|
except Exception as e:
|
|
messages.error(request, f'Backup failed: {str(e)}')
|
|
|
|
return redirect('core:database_management')
|
|
|
|
def test_form(request):
|
|
"""Test view to isolate form rendering issue"""
|
|
# Check database connectivity
|
|
db_connected = False
|
|
db_error = None
|
|
try:
|
|
from django.db import connection
|
|
connection.ensure_connection()
|
|
db_connected = True
|
|
except Exception as e:
|
|
db_error = str(e)
|
|
|
|
from purchase.forms import PurchaseOrderForm
|
|
from sales.forms import SaleOrderForm
|
|
|
|
purchase_form = PurchaseOrderForm()
|
|
sales_form = SaleOrderForm()
|
|
|
|
# Check if suppliers/customers are available
|
|
suppliers = []
|
|
customers = []
|
|
supplier_count = 0
|
|
customer_count = 0
|
|
model_error = None
|
|
|
|
try:
|
|
from purchase.models import Supplier
|
|
from sales.models import Customer
|
|
suppliers = Supplier.objects.all()
|
|
customers = Customer.objects.all()
|
|
supplier_count = suppliers.count()
|
|
customer_count = customers.count()
|
|
except Exception as e:
|
|
model_error = str(e)
|
|
|
|
# Debug information
|
|
purchase_form_debug = {
|
|
'field_type': type(purchase_form.fields['supplier']).__name__,
|
|
'choices_count': len(purchase_form.fields['supplier'].choices) if hasattr(purchase_form.fields['supplier'], 'choices') else 'N/A',
|
|
'choices': list(purchase_form.fields['supplier'].choices) if hasattr(purchase_form.fields['supplier'], 'choices') else [],
|
|
}
|
|
|
|
sales_form_debug = {
|
|
'field_type': type(sales_form.fields['customer']).__name__,
|
|
'choices_count': len(sales_form.fields['customer'].choices) if hasattr(sales_form.fields['customer'], 'choices') else 'N/A',
|
|
'choices': list(sales_form.fields['customer'].choices) if hasattr(sales_form.fields['customer'], 'choices') else [],
|
|
}
|
|
|
|
context = {
|
|
'purchase_form': purchase_form,
|
|
'sales_form': sales_form,
|
|
'suppliers': suppliers,
|
|
'customers': customers,
|
|
'supplier_count': supplier_count,
|
|
'customer_count': customer_count,
|
|
'db_connected': db_connected,
|
|
'db_error': db_error,
|
|
'model_error': model_error,
|
|
'purchase_form_debug': purchase_form_debug,
|
|
'sales_form_debug': sales_form_debug,
|
|
}
|
|
|
|
return render(request, 'core/test_form.html', context)
|
|
|
|
@login_required
|
|
@user_passes_test(is_admin)
|
|
def change_password(request):
|
|
if request.method == 'POST':
|
|
try:
|
|
username = request.POST.get('username')
|
|
new_password = request.POST.get('new_password')
|
|
confirm_password = request.POST.get('confirm_password')
|
|
|
|
if not username:
|
|
messages.error(request, 'Username is required.')
|
|
return redirect('core:database_management')
|
|
|
|
if not new_password:
|
|
messages.error(request, 'New password is required.')
|
|
return redirect('core:database_management')
|
|
|
|
if new_password != confirm_password:
|
|
messages.error(request, 'Passwords do not match.')
|
|
return redirect('core:database_management')
|
|
|
|
# Validate password length
|
|
if len(new_password) < 8:
|
|
messages.error(request, 'Password must be at least 8 characters long.')
|
|
return redirect('core:database_management')
|
|
|
|
# Get user and change password
|
|
from django.contrib.auth import get_user_model
|
|
User = get_user_model()
|
|
|
|
try:
|
|
user = User.objects.get(username=username)
|
|
user.set_password(new_password)
|
|
user.save()
|
|
messages.success(request, f'Successfully changed password for user "{username}"')
|
|
except User.DoesNotExist:
|
|
messages.error(request, f'User "{username}" does not exist.')
|
|
except Exception as e:
|
|
messages.error(request, f'Failed to change password: {str(e)}')
|
|
|
|
return redirect('core:database_management')
|
|
|
|
@login_required
|
|
def simple_test(request):
|
|
"""Simple test view to isolate form rendering issue"""
|
|
from purchase.forms import PurchaseOrderForm
|
|
from sales.forms import SaleOrderForm
|
|
|
|
purchase_form = PurchaseOrderForm()
|
|
sales_form = SaleOrderForm()
|
|
|
|
context = {
|
|
'purchase_form': purchase_form,
|
|
'sales_form': sales_form,
|
|
}
|
|
|
|
return render(request, 'core/simple_test.html', context)
|
|
|
|
@login_required
|
|
@user_passes_test(is_admin)
|
|
def restore_database(request):
|
|
if request.method == 'POST':
|
|
try:
|
|
# Use the same path logic as in settings.py
|
|
if getattr(sys, 'frozen', False):
|
|
# Running as compiled executable
|
|
base_db_dir = Path(os.path.dirname(sys.executable))
|
|
else:
|
|
# Running as script
|
|
base_db_dir = settings.BASE_DIR
|
|
|
|
backup_file = request.FILES.get('backup_file')
|
|
if backup_file:
|
|
# Create backup of current database first
|
|
current_db = base_db_dir / 'db.sqlite3'
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
safety_backup = base_db_dir / f'safety_backup_{timestamp}.sqlite3'
|
|
shutil.copy2(current_db, safety_backup)
|
|
|
|
# Restore from uploaded backup
|
|
with open(current_db, 'wb') as f:
|
|
for chunk in backup_file.chunks():
|
|
f.write(chunk)
|
|
|
|
messages.success(request, 'Database restored successfully. Previous database backed up as safety measure.')
|
|
else:
|
|
messages.error(request, 'No backup file provided.')
|
|
except Exception as e:
|
|
messages.error(request, f'Restore failed: {str(e)}')
|
|
|
|
return redirect('core:database_management')
|
|
|