208 lines
7.6 KiB
Python
208 lines
7.6 KiB
Python
from django.shortcuts import render, redirect
|
|
from django.contrib.auth.decorators import login_required, user_passes_test
|
|
from django.contrib import messages
|
|
from django.http import JsonResponse
|
|
from django.views.generic import TemplateView
|
|
from django.db import connection
|
|
from django.conf import settings
|
|
import sqlite3
|
|
import os
|
|
import shutil
|
|
from datetime import datetime, timedelta
|
|
from django.utils import timezone
|
|
from django.db.models import Sum, Count, Avg
|
|
from decimal import Decimal
|
|
|
|
from inventory.models import Product, StockMovement
|
|
from purchase.models import PurchaseOrder, PurchaseOrderItem
|
|
from manufacture.models import ManufacturingOrder
|
|
from sales.models import SaleOrder, SaleOrderItem
|
|
|
|
def is_admin(user):
|
|
return user.is_superuser
|
|
|
|
class DashboardView(TemplateView):
|
|
template_name = 'core/dashboard.html'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
|
|
# Get current date and calculate date ranges
|
|
today = timezone.now().date()
|
|
week_ago = today - timedelta(days=7)
|
|
month_ago = today - timedelta(days=30)
|
|
|
|
# Production data
|
|
context['daily_production'] = ManufacturingOrder.objects.filter(
|
|
date=today
|
|
).aggregate(
|
|
total_quantity=Sum('quantity'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
context['weekly_production'] = ManufacturingOrder.objects.filter(
|
|
date__gte=week_ago
|
|
).aggregate(
|
|
total_quantity=Sum('quantity'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
context['monthly_production'] = ManufacturingOrder.objects.filter(
|
|
date__gte=month_ago
|
|
).aggregate(
|
|
total_quantity=Sum('quantity'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
# Sales data
|
|
context['daily_sales'] = SaleOrder.objects.filter(
|
|
date=today
|
|
).aggregate(
|
|
total_amount=Sum('total_amount'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
context['weekly_sales'] = SaleOrder.objects.filter(
|
|
date__gte=week_ago
|
|
).aggregate(
|
|
total_amount=Sum('total_amount'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
context['monthly_sales'] = SaleOrder.objects.filter(
|
|
date__gte=month_ago
|
|
).aggregate(
|
|
total_amount=Sum('total_amount'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
# Purchase data
|
|
context['daily_purchases'] = PurchaseOrder.objects.filter(
|
|
date=today
|
|
).aggregate(
|
|
total_amount=Sum('total_amount'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
context['weekly_purchases'] = PurchaseOrder.objects.filter(
|
|
date__gte=week_ago
|
|
).aggregate(
|
|
total_amount=Sum('total_amount'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
context['monthly_purchases'] = PurchaseOrder.objects.filter(
|
|
date__gte=month_ago
|
|
).aggregate(
|
|
total_amount=Sum('total_amount'),
|
|
total_orders=Count('id')
|
|
)
|
|
|
|
# Inventory turnover (last 30 days)
|
|
context['inventory_turnover'] = StockMovement.objects.filter(
|
|
date__gte=month_ago
|
|
).aggregate(
|
|
total_movements=Count('id'),
|
|
total_quantity=Sum('quantity')
|
|
)
|
|
|
|
# Profit/Loss calculation (simplified)
|
|
total_sales = context['monthly_sales']['total_amount'] or Decimal('0')
|
|
total_purchases = context['monthly_purchases']['total_amount'] or Decimal('0')
|
|
context['monthly_profit'] = total_sales - total_purchases
|
|
|
|
# Recent activities
|
|
context['recent_manufacturing'] = ManufacturingOrder.objects.order_by('-date')[:5]
|
|
context['recent_sales'] = SaleOrder.objects.order_by('-date')[:5]
|
|
context['recent_purchases'] = PurchaseOrder.objects.order_by('-date')[:5]
|
|
|
|
# Low stock alerts
|
|
context['low_stock_products'] = Product.objects.filter(
|
|
current_stock__lte=10
|
|
)[:5]
|
|
|
|
return context
|
|
|
|
class DatabaseManagementView(TemplateView):
|
|
template_name = 'core/database_management.html'
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
if not is_admin(request.user):
|
|
messages.error(request, 'Access denied. Admin privileges required.')
|
|
return redirect('core:dashboard')
|
|
return super().dispatch(request, *args, **kwargs)
|
|
|
|
@login_required
|
|
@user_passes_test(is_admin)
|
|
def backup_database(request):
|
|
if request.method == 'POST':
|
|
try:
|
|
# Create backup directory if it doesn't exist
|
|
backup_dir = os.path.join(settings.BASE_DIR, 'backups')
|
|
os.makedirs(backup_dir, exist_ok=True)
|
|
|
|
# Generate backup filename with timestamp
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
backup_filename = f'manufacture_app_backup_{timestamp}.sqlite3'
|
|
backup_path = os.path.join(backup_dir, backup_filename)
|
|
|
|
# Copy the database file
|
|
db_path = os.path.join(settings.BASE_DIR, 'db.sqlite3')
|
|
shutil.copy2(db_path, backup_path)
|
|
|
|
messages.success(request, f'Database backed up successfully to {backup_filename}')
|
|
except Exception as e:
|
|
messages.error(request, f'Backup failed: {str(e)}')
|
|
|
|
return redirect('core:database_management')
|
|
|
|
@login_required
|
|
@user_passes_test(is_admin)
|
|
def restore_database(request):
|
|
if request.method == 'POST':
|
|
try:
|
|
backup_file = request.FILES.get('backup_file')
|
|
if backup_file:
|
|
# Create backup of current database first
|
|
current_db = os.path.join(settings.BASE_DIR, 'db.sqlite3')
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
safety_backup = os.path.join(settings.BASE_DIR, f'safety_backup_{timestamp}.sqlite3')
|
|
shutil.copy2(current_db, safety_backup)
|
|
|
|
# Restore from uploaded backup
|
|
with open(current_db, 'wb') as f:
|
|
for chunk in backup_file.chunks():
|
|
f.write(chunk)
|
|
|
|
messages.success(request, 'Database restored successfully. Previous database backed up as safety measure.')
|
|
else:
|
|
messages.error(request, 'No backup file provided.')
|
|
except Exception as e:
|
|
messages.error(request, f'Restore failed: {str(e)}')
|
|
|
|
return redirect('core:database_management')
|
|
|
|
@login_required
|
|
@user_passes_test(is_admin)
|
|
def duplicate_database(request):
|
|
if request.method == 'POST':
|
|
try:
|
|
# Create backup directory if it doesn't exist
|
|
backup_dir = os.path.join(settings.BASE_DIR, 'backups')
|
|
os.makedirs(backup_dir, exist_ok=True)
|
|
|
|
# Generate duplicate filename with timestamp
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
duplicate_filename = f'manufacture_app_duplicate_{timestamp}.sqlite3'
|
|
duplicate_path = os.path.join(backup_dir, duplicate_filename)
|
|
|
|
# Copy the database file
|
|
db_path = os.path.join(settings.BASE_DIR, 'db.sqlite3')
|
|
shutil.copy2(db_path, duplicate_path)
|
|
|
|
messages.success(request, f'Database duplicated successfully to {duplicate_filename}')
|
|
except Exception as e:
|
|
messages.error(request, f'Duplication failed: {str(e)}')
|
|
|
|
return redirect('core:database_management')
|