Django_Basic_Manufacturing/core/views.py

248 lines
9.4 KiB
Python

from django.shortcuts import render, redirect
from django.contrib.auth.decorators import login_required, user_passes_test
from django.contrib import messages
from django.http import JsonResponse
from django.views.generic import TemplateView
from django.db import connection
from django.conf import settings
import sqlite3
import os
import shutil
from datetime import datetime, timedelta
from django.utils import timezone
from django.db.models import Sum, Count, Avg
from decimal import Decimal
from inventory.models import Product, StockMovement
from purchase.models import PurchaseOrder, PurchaseOrderItem
from manufacture.models import ManufacturingOrder
from sales.models import SaleOrder, SaleOrderItem
def is_admin(user):
return user.is_superuser
class DashboardView(TemplateView):
template_name = 'core/dashboard.html'
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
# Get current date and calculate date ranges
today = timezone.now().date()
week_ago = today - timedelta(days=7)
month_ago = today - timedelta(days=30)
# Production data
context['daily_production'] = ManufacturingOrder.objects.filter(
date=today
).aggregate(
total_quantity=Sum('quantity'),
total_orders=Count('id')
)
context['weekly_production'] = ManufacturingOrder.objects.filter(
date__gte=week_ago
).aggregate(
total_quantity=Sum('quantity'),
total_orders=Count('id')
)
context['monthly_production'] = ManufacturingOrder.objects.filter(
date__gte=month_ago
).aggregate(
total_quantity=Sum('quantity'),
total_orders=Count('id')
)
# Sales data
context['daily_sales'] = SaleOrder.objects.filter(
date=today
).aggregate(
total_amount=Sum('total_amount'),
total_orders=Count('id')
)
context['weekly_sales'] = SaleOrder.objects.filter(
date__gte=week_ago
).aggregate(
total_amount=Sum('total_amount'),
total_orders=Count('id')
)
context['monthly_sales'] = SaleOrder.objects.filter(
date__gte=month_ago
).aggregate(
total_amount=Sum('total_amount'),
total_orders=Count('id')
)
# Purchase data
context['daily_purchases'] = PurchaseOrder.objects.filter(
date=today
).aggregate(
total_amount=Sum('total_amount'),
total_orders=Count('id')
)
context['weekly_purchases'] = PurchaseOrder.objects.filter(
date__gte=week_ago
).aggregate(
total_amount=Sum('total_amount'),
total_orders=Count('id')
)
context['monthly_purchases'] = PurchaseOrder.objects.filter(
date__gte=month_ago
).aggregate(
total_amount=Sum('total_amount'),
total_orders=Count('id')
)
# Inventory turnover (last 30 days)
context['inventory_turnover'] = StockMovement.objects.filter(
date__gte=month_ago
).aggregate(
total_movements=Count('id'),
total_quantity=Sum('quantity')
)
# Recent inventory movements
context['recent_inventory_movements'] = StockMovement.objects.select_related('product').order_by('-date')[:10]
# Inventory movement by type
context['inventory_movement_by_type'] = StockMovement.objects.values('movement_type').annotate(
count=Count('id'),
total_quantity=Sum('quantity')
).order_by('-count')
# Profit/Loss calculation (simplified)
total_sales = context['monthly_sales']['total_amount'] or Decimal('0')
total_purchases = context['monthly_purchases']['total_amount'] or Decimal('0')
context['monthly_profit'] = total_sales - total_purchases
# Recent activities
context['recent_manufacturing'] = ManufacturingOrder.objects.order_by('-date')[:5]
context['recent_sales'] = SaleOrder.objects.order_by('-date')[:5]
context['recent_purchases'] = PurchaseOrder.objects.order_by('-date')[:5]
# Inventory status counts
context['inventory_status_counts'] = {
'low_stock': Product.objects.filter(current_stock__lte=10).count(),
'normal': Product.objects.filter(current_stock__gt=10, current_stock__lt=100).count(),
'overstocked': Product.objects.filter(current_stock__gte=100).count(),
}
# Low stock alerts
context['low_stock_products'] = Product.objects.filter(
current_stock__lte=10
)[:5]
return context
class DatabaseManagementView(TemplateView):
template_name = 'core/database_management.html'
def dispatch(self, request, *args, **kwargs):
if not is_admin(request.user):
messages.error(request, 'Access denied. Admin privileges required.')
return redirect('core:dashboard')
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
# Database file information
db_path = os.path.join(settings.BASE_DIR, 'db.sqlite3')
if os.path.exists(db_path):
stat = os.stat(db_path)
context['db_size'] = f"{stat.st_size / (1024*1024):.2f} MB"
context['db_modified'] = datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
# Available backups
backup_dir = os.path.join(settings.BASE_DIR, 'backups')
if os.path.exists(backup_dir):
backups = []
for file in os.listdir(backup_dir):
if file.endswith('.sqlite3'):
file_path = os.path.join(backup_dir, file)
stat = os.stat(file_path)
backups.append({
'name': file,
'size': f"{stat.st_size / 1024:.2f} KB",
'modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M')
})
context['available_backups'] = sorted(backups, key=lambda x: x['modified'], reverse=True)
# Database statistics
try:
from django.contrib.auth import get_user_model
from inventory.models import Product, Customer
from purchase.models import PurchaseOrder
from sales.models import SaleOrder
User = get_user_model()
context['total_users'] = User.objects.count()
context['total_products'] = Product.objects.count()
context['total_orders'] = PurchaseOrder.objects.count() + SaleOrder.objects.count()
context['total_customers'] = Customer.objects.count()
except Exception as e:
# If there's an error, set default values
context['total_users'] = 0
context['total_products'] = 0
context['total_orders'] = 0
context['total_customers'] = 0
return context
@login_required
@user_passes_test(is_admin)
def backup_database(request):
if request.method == 'POST':
try:
# Create backup directory if it doesn't exist
backup_dir = os.path.join(settings.BASE_DIR, 'backups')
os.makedirs(backup_dir, exist_ok=True)
# Generate backup filename with timestamp
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_filename = f'manufacture_app_backup_{timestamp}.sqlite3'
backup_path = os.path.join(backup_dir, backup_filename)
# Copy the database file
db_path = os.path.join(settings.BASE_DIR, 'db.sqlite3')
shutil.copy2(db_path, backup_path)
messages.success(request, f'Database backed up successfully to {backup_filename}')
except Exception as e:
messages.error(request, f'Backup failed: {str(e)}')
return redirect('core:database_management')
@login_required
@user_passes_test(is_admin)
def restore_database(request):
if request.method == 'POST':
try:
backup_file = request.FILES.get('backup_file')
if backup_file:
# Create backup of current database first
current_db = os.path.join(settings.BASE_DIR, 'db.sqlite3')
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
safety_backup = os.path.join(settings.BASE_DIR, f'safety_backup_{timestamp}.sqlite3')
shutil.copy2(current_db, safety_backup)
# Restore from uploaded backup
with open(current_db, 'wb') as f:
for chunk in backup_file.chunks():
f.write(chunk)
messages.success(request, 'Database restored successfully. Previous database backed up as safety measure.')
else:
messages.error(request, 'No backup file provided.')
except Exception as e:
messages.error(request, f'Restore failed: {str(e)}')
return redirect('core:database_management')