123 lines
3.3 KiB
Python
123 lines
3.3 KiB
Python
import contextlib
|
|
import os
|
|
import subprocess
|
|
|
|
from django.conf import settings
|
|
from django.core.files import File
|
|
from django.core.files.storage import Storage
|
|
from django.utils import timezone
|
|
|
|
from dbbackup.db.base import get_connector
|
|
|
|
BASE_FILE = os.path.join(settings.BLOB_DIR, "test.txt")
|
|
ENCRYPTED_FILE = os.path.join(settings.BLOB_DIR, "test.txt.gpg")
|
|
COMPRESSED_FILE = os.path.join(settings.BLOB_DIR, "test.txt.gz")
|
|
TARED_FILE = os.path.join(settings.BLOB_DIR, "test.txt.tar")
|
|
ENCRYPTED_COMPRESSED_FILE = os.path.join(settings.BLOB_DIR, "test.txt.gz.gpg")
|
|
TEST_DATABASE = {
|
|
"ENGINE": "django.db.backends.sqlite3",
|
|
"NAME": "/tmp/foo.db",
|
|
"USER": "foo",
|
|
"PASSWORD": "bar",
|
|
"HOST": "foo",
|
|
"PORT": 122,
|
|
}
|
|
TEST_MONGODB = {
|
|
"ENGINE": "django_mongodb_engine",
|
|
"NAME": "mongo_test",
|
|
"USER": "foo",
|
|
"PASSWORD": "bar",
|
|
"HOST": "foo",
|
|
"PORT": 122,
|
|
}
|
|
TEST_DATABASE = settings.DATABASES["default"]
|
|
|
|
GPG_PRIVATE_PATH = os.path.join(settings.BLOB_DIR, "gpg/secring.gpg")
|
|
GPG_PUBLIC_PATH = os.path.join(settings.BLOB_DIR, "gpg/pubring.gpg")
|
|
GPG_FINGERPRINT = "7438 8D4E 02AF C011 4E2F 1E79 F7D1 BBF0 1F63 FDE9"
|
|
DEV_NULL = open(os.devnull, "w")
|
|
|
|
|
|
class handled_files(dict):
|
|
"""
|
|
Dict for gather information about fake storage and clean between tests.
|
|
You should use the constant instance ``HANDLED_FILES`` and clean it
|
|
before tests.
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.clean()
|
|
|
|
def clean(self):
|
|
self["written_files"] = []
|
|
self["deleted_files"] = []
|
|
|
|
|
|
HANDLED_FILES = handled_files()
|
|
|
|
|
|
class FakeStorage(Storage):
|
|
name = "FakeStorage"
|
|
|
|
def exists(self, name):
|
|
return name in HANDLED_FILES["written_files"]
|
|
|
|
def get_available_name(self, name, max_length=None):
|
|
return name[:max_length]
|
|
|
|
def get_valid_name(self, name):
|
|
return name
|
|
|
|
def listdir(self, path):
|
|
return ([], [f[0] for f in HANDLED_FILES["written_files"]])
|
|
|
|
def accessed_time(self, name):
|
|
return timezone.now()
|
|
|
|
created_time = modified_time = accessed_time
|
|
|
|
def _open(self, name, mode="rb"):
|
|
file_ = [f[1] for f in HANDLED_FILES["written_files"] if f[0] == name][0]
|
|
file_.seek(0)
|
|
return file_
|
|
|
|
def _save(self, name, content):
|
|
HANDLED_FILES["written_files"].append((name, File(content)))
|
|
return name
|
|
|
|
def delete(self, name):
|
|
HANDLED_FILES["deleted_files"].append(name)
|
|
|
|
|
|
def clean_gpg_keys():
|
|
with contextlib.suppress(Exception):
|
|
cmd = "gpg --batch --yes --delete-key '%s'" % GPG_FINGERPRINT
|
|
subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)
|
|
with contextlib.suppress(Exception):
|
|
cmd = "gpg --batch --yes --delete-secrect-key '%s'" % GPG_FINGERPRINT
|
|
subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)
|
|
|
|
|
|
def add_private_gpg():
|
|
cmd = f"gpg --import {GPG_PRIVATE_PATH}".split()
|
|
subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)
|
|
|
|
|
|
def add_public_gpg():
|
|
cmd = f"gpg --import {GPG_PUBLIC_PATH}".split()
|
|
subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)
|
|
|
|
|
|
def callable_for_filename_template(datetime, **kwargs):
|
|
return f"{datetime}_foo"
|
|
|
|
|
|
def get_dump(database=TEST_DATABASE):
|
|
return get_connector().create_dump()
|
|
|
|
|
|
def get_dump_name(database=None):
|
|
database = database or TEST_DATABASE
|
|
return get_connector().generate_filename()
|