import os import subprocess import tempfile from datetime import datetime from django.shortcuts import render, get_object_or_404, redirect from django.contrib.auth.decorators import login_required, permission_required from django.http import HttpResponse, Http404, JsonResponse from django.contrib import messages from django.conf import settings from django.core.files.storage import default_storage from django.core.files.base import ContentFile from .models import DatabaseBackup @login_required @permission_required('database_management.view_database', raise_exception=True) def db_dashboard(request): """Database management dashboard view""" # Get database statistics try: import sqlite3 db_path = settings.DATABASES['default']['NAME'] if os.path.exists(db_path): db_size = os.path.getsize(db_path) db_size_mb = db_size / (1024 * 1024) # Get table count conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = cursor.fetchall() table_count = len(tables) conn.close() else: db_size_mb = 0 table_count = 0 # Get last backup info last_backup = DatabaseBackup.objects.filter(status='completed').order_by('-created_at').first() last_backup_info = last_backup.completed_at.strftime('%Y-%m-%d %I:%M %p') if last_backup else 'No backups yet' except Exception as e: db_size_mb = 0 table_count = 0 last_backup_info = 'Unable to retrieve' context = { 'module_title': 'Database Management', 'db_size': f"{db_size_mb:.1f} MB", 'table_count': f"{table_count} tables", 'last_backup': last_backup_info, } return render(request, 'database_management/dashboard.html', context) @login_required @permission_required('database_management.add_database', raise_exception=True) def backup_view(request): """Create a database backup""" if request.method == 'POST': try: # Generate backup filename timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f"backup_{timestamp}.db" # Database path db_path = settings.DATABASES['default']['NAME'] # Create backup directory if it doesn't exist backup_dir = os.path.join(settings.MEDIA_ROOT, 'database_backups') os.makedirs(backup_dir, exist_ok=True) backup_path = os.path.join(backup_dir, filename) # Copy database file import shutil shutil.copy2(db_path, backup_path) # Get file size file_size = os.path.getsize(backup_path) # Save backup record backup = DatabaseBackup.objects.create( filename=filename, file_path=backup_path, size=file_size, status='completed', created_by=request.user, completed_at=datetime.now() ) messages.success(request, f'Database backup created successfully: {filename}') return redirect('database_management:backup_list') except Exception as e: messages.error(request, f'Error creating backup: {str(e)}') return redirect('database_management:db_dashboard') context = { 'module_title': 'Create Database Backup', } return render(request, 'database_management/backup.html', context) @login_required @permission_required('database_management.change_database', raise_exception=True) def restore_view(request): """Restore a database backup""" if request.method == 'POST' and request.FILES.get('backup_file'): try: backup_file = request.FILES['backup_file'] # Validate file extension if not backup_file.name.endswith(('.db', '.sqlite3')): messages.error(request, 'Invalid file type. Please upload a SQLite database file.') return redirect('database_management:db_dashboard') # Database path db_path = settings.DATABASES['default']['NAME'] # Create backup of current database before restore timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_filename = f"pre_restore_backup_{timestamp}.db" backup_dir = os.path.join(settings.MEDIA_ROOT, 'database_backups') os.makedirs(backup_dir, exist_ok=True) pre_restore_path = os.path.join(backup_dir, backup_filename) import shutil if os.path.exists(db_path): shutil.copy2(db_path, pre_restore_path) # Save pre-restore backup record pre_backup_size = os.path.getsize(pre_restore_path) DatabaseBackup.objects.create( filename=backup_filename, file_path=pre_restore_path, size=pre_backup_size, status='completed', created_by=request.user, completed_at=datetime.now() ) # Save uploaded file temporarily temp_dir = tempfile.mkdtemp() temp_path = os.path.join(temp_dir, 'uploaded_backup.db') with open(temp_path, 'wb+') as destination: for chunk in backup_file.chunks(): destination.write(chunk) # Validate uploaded file is a valid SQLite database import sqlite3 try: conn = sqlite3.connect(temp_path) conn.close() except sqlite3.Error: messages.error(request, 'Invalid database file. Please upload a valid SQLite database.') os.remove(temp_path) os.rmdir(temp_dir) return redirect('database_management:db_dashboard') # Replace current database with uploaded backup shutil.move(temp_path, db_path) os.rmdir(temp_dir) messages.success(request, 'Database restored successfully from backup file.') messages.info(request, f'A pre-restore backup was created: {backup_filename}') return redirect('database_management:db_dashboard') except Exception as e: messages.error(request, f'Error restoring database: {str(e)}') return redirect('database_management:db_dashboard') context = { 'module_title': 'Restore Database', } return render(request, 'database_management/restore.html', context) @login_required @permission_required('database_management.add_database', raise_exception=True) def initialize_view(request): """Initialize the database""" if request.method == 'POST': try: # This would typically run migrations or initialization scripts # For now, just show a message messages.success(request, 'Database initialization completed successfully.') return redirect('database_management:db_dashboard') except Exception as e: messages.error(request, f'Error initializing database: {str(e)}') return redirect('database_management:db_dashboard') context = { 'module_title': 'Initialize Database', } return render(request, 'database_management/initialize.html', context) @login_required @permission_required('database_management.view_database', raise_exception=True) def backup_list_view(request): """List all database backups""" backups = DatabaseBackup.objects.all().order_by('-created_at') context = { 'module_title': 'Database Backup List', 'backups': backups, } return render(request, 'database_management/backup_list.html', context) @login_required @permission_required('database_management.view_database', raise_exception=True) def download_backup_view(request, backup_id): """Download a database backup""" try: backup = get_object_or_404(DatabaseBackup, id=backup_id) if not os.path.exists(backup.file_path): messages.error(request, 'Backup file not found on disk.') return redirect('database_management:backup_list') # Read file content with open(backup.file_path, 'rb') as f: file_data = f.read() # Return file as response response = HttpResponse(file_data, content_type='application/octet-stream') response['Content-Disposition'] = f'attachment; filename="{backup.filename}"' response['Content-Length'] = len(file_data) return response except Exception as e: messages.error(request, f'Error downloading backup: {str(e)}') return redirect('database_management:backup_list') @login_required @permission_required('database_management.delete_database', raise_exception=True) def delete_backup_view(request, backup_id): """Delete a database backup""" try: backup = get_object_or_404(DatabaseBackup, id=backup_id) if request.method == 'POST': # Delete file from disk if it exists if os.path.exists(backup.file_path): os.remove(backup.file_path) # Delete database record backup.delete() messages.success(request, f'Backup {backup.filename} deleted successfully.') return redirect('database_management:backup_list') context = { 'module_title': 'Delete Backup', 'backup': backup, } return render(request, 'database_management/delete_backup.html', context) except Exception as e: messages.error(request, f'Error deleting backup: {str(e)}') return redirect('database_management:backup_list')