# Database Management Module Implementation Plan ## Overview This document outlines the implementation plan for the database management module, including backup, restore, and initialization functionality for the manufacturing application. ## Database Management Models ### 1. Database Backup Model ```python # database_management/models.py from django.db import models from django.contrib.auth.models import User import os class DatabaseBackup(models.Model): BACKUP_STATUS = [ ('pending', 'Pending'), ('in_progress', 'In Progress'), ('completed', 'Completed'), ('failed', 'Failed'), ] BACKUP_TYPES = [ ('full', 'Full Backup'), ('incremental', 'Incremental Backup'), ('schema_only', 'Schema Only'), ('data_only', 'Data Only'), ] name = models.CharField(max_length=200) description = models.TextField(blank=True) filename = models.CharField(max_length=255, unique=True) file_path = models.CharField(max_length=500) file_size = models.BigIntegerField(default=0) backup_type = models.CharField(max_length=20, choices=BACKUP_TYPES, default='full') status = models.CharField(max_length=20, choices=BACKUP_STATUS, default='pending') created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True) created_at = models.DateTimeField(auto_now_add=True) completed_at = models.DateTimeField(null=True, blank=True) compression_type = models.CharField( max_length=10, choices=[('gzip', 'GZIP'), ('none', 'None')], default='gzip' ) class Meta: ordering = ['-created_at'] def __str__(self): return self.name def get_file_size_mb(self): """Return file size in MB""" return round(self.file_size / (1024 * 1024), 2) def delete_file(self): """Delete the backup file from filesystem""" if os.path.exists(self.file_path): os.remove(self.file_path) class DatabaseRestore(models.Model): RESTORE_STATUS = [ ('pending', 'Pending'), ('in_progress', 'In Progress'), ('completed', 'Completed'), ('failed', 'Failed'), ] backup = models.ForeignKey(DatabaseBackup, on_delete=models.CASCADE) status = models.CharField(max_length=20, choices=RESTORE_STATUS, default='pending') started_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True) started_at = models.DateTimeField(auto_now_add=True) completed_at = models.DateTimeField(null=True, blank=True) log = models.TextField(blank=True) def __str__(self): return f"Restore of {self.backup.name}" class DatabaseInitialization(models.Model): INIT_STATUS = [ ('pending', 'Pending'), ('in_progress', 'In Progress'), ('completed', 'Completed'), ('failed', 'Failed'), ] name = models.CharField(max_length=200) description = models.TextField(blank=True) script_file = models.FileField(upload_to='init_scripts/') status = models.CharField(max_length=20, choices=INIT_STATUS, default='pending') created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True) created_at = models.DateTimeField(auto_now_add=True) executed_at = models.DateTimeField(null=True, blank=True) log = models.TextField(blank=True) def __str__(self): return self.name ``` ## Database Management Services ### 2. Database Management Service Classes ```python # database_management/services.py import os import subprocess import gzip import shutil from datetime import datetime from django.conf import settings from django.core.management import execute_from_command_line from django.db import connection from .models import DatabaseBackup, DatabaseRestore, DatabaseInitialization class DatabaseBackupService: """Service for database backup operations""" def __init__(self): self.backup_dir = os.path.join(settings.MEDIA_ROOT, 'backups') os.makedirs(self.backup_dir, exist_ok=True) def create_backup(self, name, description="", backup_type='full', compression='gzip', created_by=None): """Create a database backup""" # Create backup record timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f"backup_{timestamp}.sql" if compression == 'gzip': filename += '.gz' file_path = os.path.join(self.backup_dir, filename) backup = DatabaseBackup.objects.create( name=name, description=description, filename=filename, file_path=file_path, backup_type=backup_type, compression_type=compression, status='in_progress', created_by=created_by ) try: # Perform backup based on database engine if settings.DATABASES['default']['ENGINE'] == 'django.db.backends.sqlite3': self._backup_sqlite(backup) elif settings.DATABASES['default']['ENGINE'] == 'django.db.backends.postgresql': self._backup_postgresql(backup) else: raise NotImplementedError("Database engine not supported for backup") # Update file size if os.path.exists(file_path): backup.file_size = os.path.getsize(file_path) backup.status = 'completed' backup.completed_at = datetime.now() backup.save() return backup except Exception as e: backup.status = 'failed' backup.completed_at = datetime.now() backup.save() raise e def _backup_sqlite(self, backup): """Backup SQLite database""" db_path = settings.DATABASES['default']['NAME'] if backup.compression_type == 'gzip': with open(db_path, 'rb') as f_in: with gzip.open(backup.file_path, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) else: shutil.copy2(db_path, backup.file_path) def _backup_postgresql(self, backup): """Backup PostgreSQL database""" db_settings = settings.DATABASES['default'] env = os.environ.copy() env['PGPASSWORD'] = db_settings.get('PASSWORD', '') cmd = [ 'pg_dump', '-h', db_settings.get('HOST', 'localhost'), '-p', str(db_settings.get('PORT', '5432')), '-U', db_settings.get('USER', 'postgres'), '-d', db_settings.get('NAME', ''), '-f', backup.file_path ] if backup.compression_type == 'gzip': cmd.append('-Z') cmd.append('1') # Compression level subprocess.run(cmd, env=env, check=True) def delete_backup(self, backup_id): """Delete a backup""" backup = DatabaseBackup.objects.get(id=backup_id) backup.delete_file() backup.delete() class DatabaseRestoreService: """Service for database restore operations""" def __init__(self): pass def restore_backup(self, backup_id, started_by=None): """Restore a database backup""" backup = DatabaseBackup.objects.get(id=backup_id) # Create restore record restore = DatabaseRestore.objects.create( backup=backup, status='in_progress', started_by=started_by ) try: # Perform restore based on database engine if settings.DATABASES['default']['ENGINE'] == 'django.db.backends.sqlite3': self._restore_sqlite(backup) elif settings.DATABASES['default']['ENGINE'] == 'django.db.backends.postgresql': self._restore_postgresql(backup) else: raise NotImplementedError("Database engine not supported for restore") restore.status = 'completed' restore.completed_at = datetime.now() restore.save() return restore except Exception as e: restore.status = 'failed' restore.completed_at = datetime.now() restore.log = str(e) restore.save() raise e def _restore_sqlite(self, backup): """Restore SQLite database""" db_path = settings.DATABASES['default']['NAME'] # Backup current database first backup_path = f"{db_path}.backup" shutil.copy2(db_path, backup_path) try: if backup.compression_type == 'gzip': with gzip.open(backup.file_path, 'rb') as f_in: with open(db_path, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) else: shutil.copy2(backup.file_path, db_path) except Exception as e: # Restore from backup if something goes wrong shutil.copy2(backup_path, db_path) raise e finally: # Clean up backup if os.path.exists(backup_path): os.remove(backup_path) def _restore_postgresql(self, backup): """Restore PostgreSQL database""" db_settings = settings.DATABASES['default'] env = os.environ.copy() env['PGPASSWORD'] = db_settings.get('PASSWORD', '') cmd = [ 'psql', '-h', db_settings.get('HOST', 'localhost'), '-p', str(db_settings.get('PORT', '5432')), '-U', db_settings.get('USER', 'postgres'), '-d', db_settings.get('NAME', ''), '-f', backup.file_path ] subprocess.run(cmd, env=env, check=True) class DatabaseInitializationService: """Service for database initialization operations""" def __init__(self): pass def initialize_database(self, init_id, executed_by=None): """Initialize database with script""" init = DatabaseInitialization.objects.get(id=init_id) # Update status init.status = 'in_progress' init.executed_at = datetime.now() init.save() try: # Execute initialization script log = self._execute_init_script(init) init.status = 'completed' init.log = log init.save() return init except Exception as e: init.status = 'failed' init.log = str(e) init.save() raise e def _execute_init_script(self, init): """Execute initialization script""" log = [] # Read and execute script with open(init.script_file.path, 'r') as f: script_content = f.read() # Execute SQL commands with connection.cursor() as cursor: statements = script_content.split(';') for statement in statements: statement = statement.strip() if statement: try: cursor.execute(statement) log.append(f"Executed: {statement[:50]}...") except Exception as e: log.append(f"Error executing: {statement[:50]}... - {str(e)}") raise e return '\n'.join(log) ``` ## Database Management Views ### 3. Database Management Views Implementation ```python # database_management/views.py from django.shortcuts import render, get_object_or_404, redirect from django.contrib.auth.decorators import login_required, user_passes_test from django.http import HttpResponse, JsonResponse from django.contrib import messages from django.conf import settings from django.db import connection from .models import DatabaseBackup, DatabaseRestore, DatabaseInitialization from .services import DatabaseBackupService, DatabaseRestoreService, DatabaseInitializationService from .forms import BackupForm, InitializationForm import os def is_superuser(user): return user.is_superuser @login_required @user_passes_test(is_superuser) def db_dashboard(request): """Database management dashboard""" # Get recent backups recent_backups = DatabaseBackup.objects.all()[:10] # Get recent restores recent_restores = DatabaseRestore.objects.select_related('backup', 'started_by').all()[:10] # Get initializations initializations = DatabaseInitialization.objects.all() # Get database info db_info = { 'engine': settings.DATABASES['default']['ENGINE'], 'name': settings.DATABASES['default']['NAME'], 'host': settings.DATABASES['default'].get('HOST', 'localhost'), 'port': settings.DATABASES['default'].get('PORT', '5432'), } # Get table count with connection.cursor() as cursor: cursor.execute("SELECT COUNT(*) FROM sqlite_master WHERE type='table';") table_count = cursor.fetchone()[0] context = { 'recent_backups': recent_backups, 'recent_restores': recent_restores, 'initializations': initializations, 'db_info': db_info, 'table_count': table_count, } return render(request, 'database_management/dashboard.html', context) @login_required @user_passes_test(is_superuser) def backup_view(request): """Create database backup""" if request.method == 'POST': form = BackupForm(request.POST) if form.is_valid(): try: # Create backup backup_service = DatabaseBackupService() backup = backup_service.create_backup( name=form.cleaned_data['name'], description=form.cleaned_data['description'], backup_type=form.cleaned_data['backup_type'], compression=form.cleaned_data['compression_type'], created_by=request.user ) messages.success(request, f"Backup '{backup.name}' created successfully.") return redirect('database_management:backup_list') except Exception as e: messages.error(request, f"Error creating backup: {str(e)}") else: form = BackupForm() context = { 'form': form, } return render(request, 'database_management/backup.html', context) @login_required @user_passes_test(is_superuser) def restore_view(request): """Restore database from backup""" if request.method == 'POST': backup_id = request.POST.get('backup_id') backup = get_object_or_404(DatabaseBackup, id=backup_id) try: # Perform restore restore_service = DatabaseRestoreService() restore = restore_service.restore_backup( backup_id=backup_id, started_by=request.user ) messages.success(request, f"Database restored from '{backup.name}' successfully.") return redirect('database_management:dashboard') except Exception as e: messages.error(request, f"Error restoring database: {str(e)}") # Get available backups backups = DatabaseBackup.objects.filter(status='completed').order_by('-created_at') context = { 'backups': backups, } return render(request, 'database_management/restore.html', context) @login_required @user_passes_test(is_superuser) def initialize_view(request): """Initialize database""" if request.method == 'POST': form = InitializationForm(request.POST, request.FILES) if form.is_valid(): init = form.save(commit=False) init.created_by = request.user init.save() try: # Perform initialization init_service = DatabaseInitializationService() init_service.initialize_database( init_id=init.id, executed_by=request.user ) messages.success(request, f"Database initialized with '{init.name}' successfully.") return redirect('database_management:dashboard') except Exception as e: messages.error(request, f"Error initializing database: {str(e)}") else: form = InitializationForm() context = { 'form': form, } return render(request, 'database_management/initialize.html', context) @login_required @user_passes_test(is_superuser) def backup_list_view(request): """List all backups""" backups = DatabaseBackup.objects.all().order_by('-created_at') context = { 'backups': backups, } return render(request, 'database_management/backup_list.html', context) @login_required @user_passes_test(is_superuser) def download_backup_view(request, backup_id): """Download backup file""" backup = get_object_or_404(DatabaseBackup, id=backup_id) if os.path.exists(backup.file_path): with open(backup.file_path, 'rb') as f: response = HttpResponse(f.read(), content_type='application/octet-stream') response['Content-Disposition'] = f'attachment; filename="{backup.filename}"' return response else: messages.error(request, "Backup file not found.") return redirect('database_management:backup_list') @login_required @user_passes_test(is_superuser) def delete_backup_view(request, backup_id): """Delete backup""" backup = get_object_or_404(DatabaseBackup, id=backup_id) if request.method == 'POST': try: # Delete backup file and record backup_service = DatabaseBackupService() backup_service.delete_backup(backup_id) messages.success(request, f"Backup '{backup.name}' deleted successfully.") except Exception as e: messages.error(request, f"Error deleting backup: {str(e)}") return redirect('database_management:backup_list') context = { 'backup': backup, } return render(request, 'database_management/delete_backup.html', context) ``` ## Database Management Forms ### 4. Database Management Forms ```python # database_management/forms.py from django import forms from .models import DatabaseBackup, DatabaseInitialization class BackupForm(forms.ModelForm): """Form for creating database backups""" class Meta: model = DatabaseBackup fields = ['name', 'description', 'backup_type', 'compression_type'] widgets = { 'name': forms.TextInput(attrs={'class': 'form-control'}), 'description': forms.Textarea(attrs={'class': 'form-control', 'rows': 3}), 'backup_type': forms.Select(attrs={'class': 'form-control'}), 'compression_type': forms.Select(attrs={'class': 'form-control'}), } def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Set default name with timestamp if not self.instance.pk and not self.initial.get('name'): from datetime import datetime timestamp = datetime.now().strftime('%Y-%m-%d %H:%M') self.initial['name'] = f"Backup {timestamp}" class InitializationForm(forms.ModelForm): """Form for database initialization""" class Meta: model = DatabaseInitialization fields = ['name', 'description', 'script_file'] widgets = { 'name': forms.TextInput(attrs={'class': 'form-control'}), 'description': forms.Textarea(attrs={'class': 'form-control', 'rows': 3}), 'script_file': forms.FileInput(attrs={'class': 'form-control'}), } ``` ## Database Management Templates ### 5. Database Management Dashboard Template ```html {% extends 'base.html' %} {% block title %}Database Management - Manufacturing App{% endblock %} {% block content %}
Engine: {{ db_info.engine }}
Name: {{ db_info.name }}
Host: {{ db_info.host }}:{{ db_info.port }}
Tables: {{ table_count }}
No backups created yet.
{% endif %}No restores performed yet.
{% endif %}