Source code for pyCloudflareUpdater.preferences.daemon_preferences

#                             pyCloudflareUpdater
#                  Copyright (C) 2021 - Javinator9889
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#      the Free Software Foundation, either version 3 of the License, or
#                   (at your option) any later version.
#
#       This program is distributed in the hope that it will be useful,
#       but WITHOUT ANY WARRANTY; without even the implied warranty of
#        MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#               GNU General Public License for more details.
#
#     You should have received a copy of the GNU General Public License
#    along with this program. If not, see <http://www.gnu.org/licenses/>.
from .crypt import (
    save_to_kr, read_from_kr, gen_key, encrypt, decrypt,
    is_valid_token, init_crypto
)
from .. import (
    PRODUCTION_FILE_LOG_LEVEL, VALID_LOGGING_LEVELS, DEFAULT_SETTINGS,
    ensure_permissions, change_permissions, VALID_RECORD_TYPES
)
from configupdater import ConfigUpdater, Section, Option
from cryptography.fernet import InvalidToken
from ..utils.cache import cached, ucached
from ..utils.str import is_none_or_empty
from typing import Union, Optional, Any
from argparse import Namespace
from pathlib import Path
import os
import logging
import warnings


def get_or_error(config: Section,
                 option: str,
                 error_msg: str,
                 default: Optional[Any] = None):
    if option not in config:
        raise KeyError(error_msg)
    v = config.get(option, str(default))
    if v is not None and isinstance(v, Option) and is_none_or_empty(v.value):
        if default is None:
            raise ValueError(error_msg)
        v = default
    return v.value if isinstance(v, Option) else v


[docs]class Preferences: __instance__ = None file = "%s/.config/cloudflare-ddns.ini" % Path.home() def __new__(cls, *args, **kwargs): if Preferences.__instance__ is None: instance = object.__new__(cls) instance.__must_init = True Preferences.__instance__ = instance return Preferences.__instance__ def __init__(self): if self.__must_init: self.config = ConfigUpdater() self._ck: Optional[bytes] = None self.__cache__ = {} self.__must_init = False
[docs] @classmethod async def create_from_args(cls, p_args: Namespace) -> 'Preferences': self = Preferences() await init_crypto() await self._init(domain=p_args.domain, name=p_args.name, rtype=p_args.type, ttl=p_args.ttl, update_time=p_args.time, key=p_args.key, mail=p_args.mail, use_proxy=p_args.proxied, pid_file=p_args.pid_file, log_file=p_args.log_file, log_level=p_args.log_level) return self
async def _init(self, domain: str = None, name: str = None, rtype: str = None, ttl: int = None, update_time: int = None, key: str = None, mail: str = None, use_proxy: bool = False, pid_file: str = None, log_file: str = None, log_level: int = PRODUCTION_FILE_LOG_LEVEL): self.config = ConfigUpdater() if not os.path.exists(self.file): await self.create_empty_file() self.config.read(self.file) ck = read_from_kr('cloudflare-key') home = Path.home() uid = os.geteuid() log_file = \ log_file or "/var/log/cloudflare-ddns.log" if uid == 0 \ else f"{home}/log/cloudflare-ddns.log" pid_file = \ pid_file or "/run/cloudflare-ddns.pid" if uid == 0 \ else f"{home}/.cache/cloudflare-ddns.pid" log_file_dir = os.path.dirname(log_file) if not os.path.exists(log_file_dir): os.makedirs(log_file_dir, mode=0o750, exist_ok=True) pid_file_dir = os.path.dirname(pid_file) if not os.path.exists(pid_file_dir): os.makedirs(pid_file_dir, mode=0o700, exist_ok=True) if 'Logging' not in self.config: self.config['Logging'] = {} if 'Cloudflare' not in self.config: self.config['Cloudflare'] = {} if 'Service' not in self.config: self.config['Service'] = {} cloudflare = self.config['Cloudflare'] log = self.config['Logging'] service = self.config['Service'] save_task = None ck = await ck if ck is None: ck = await gen_key() save_task = save_to_kr('cloudflare-key', ck) self._ck = ck error_base = '"%s" must be defined and cannot be empty!' self.domain = get_or_error(cloudflare, 'domain', error_base % 'Domain', default=domain) self.name = get_or_error(cloudflare, 'name', error_base % 'Record (name)', default=name) self.type = get_or_error(cloudflare, 'type', error_base % 'Record (type)', default=rtype) self.ttl = int(get_or_error(cloudflare, 'ttl', error_base % 'TTL (Time To Live)', default=ttl)) self.frequency = int(get_or_error(cloudflare, 'frequency-minutes', error_base % 'Frequency', default=update_time)) self.key = get_or_error(cloudflare, 'api-key', error_base % 'API Key', default=key) self.mail = get_or_error(cloudflare, 'mail', error_base % 'Mail', default=mail) self.use_proxy = bool(get_or_error(cloudflare, 'use-proxy', error_base % 'Proxied', default=str(use_proxy))) self.logging_file = get_or_error(log, 'file', error_base % 'Log file', default=log_file) self.logging_level = get_or_error(log, 'level', error_base % 'Logging level', logging.getLevelName(log_level)) self.pid_file = get_or_error(service, 'pid-file', error_base % 'PID file', default=pid_file) self.save() if save_task is not None: await save_task def _check_perms(self): if not ensure_permissions(self.file, 0o700): warnings.warn("Insecure permissions detected! Changing...") change_permissions(self.file, 0o700) @property @cached('__cache__') def domain(self) -> str: return self.config['Cloudflare']['domain'].value @domain.setter @ucached('__cache__') def domain(self, new_domain: str): if new_domain is None: raise ValueError("Domain must be provided!") self.config['Cloudflare']['domain'].value = new_domain @property @cached('__cache__') def name(self) -> str: return self.config['Cloudflare']['name'].value @name.setter @ucached('__cache__') def name(self, new_name: str): if new_name is None: raise ValueError("Record's name must be provided!") self.config['Cloudflare']['name'].value = new_name @property @cached('__cache__') def type(self) -> str: return self.config['Cloudflare']['type'].value @type.setter @ucached('__cache__') def type(self, new_type: str): if new_type is None: raise ValueError("Record's type must be provided!") if new_type not in VALID_RECORD_TYPES: raise ValueError(f"Record type '{new_type}' is not a valid value! " "Available options are: " f"{sorted(VALID_RECORD_TYPES)}") self.config['Cloudflare']['type'].value = new_type @property @cached('__cache__') def ttl(self) -> int: return int(self.config['Cloudflare']['ttl'].value) @ttl.setter @ucached('__cache__') def ttl(self, new_ttl: int): if new_ttl is None: raise ValueError('TTL value must be provided!') if new_ttl < 1: raise ValueError("TTL must be, at least, '1' (automatic) or bigger") self.config['Cloudflare']['ttl'].value = str(new_ttl) @property @cached('__cache__') def frequency(self) -> int: return int(self.config['Cloudflare']['frequency-minutes'].value) @frequency.setter @ucached('__cache__') def frequency(self, new_freq: int): if new_freq is None: raise ValueError("Update frequency must be provided") if new_freq <= 0: raise ValueError("Frequency must be higher than 0") self.config['Cloudflare']['frequency-minutes'].value = str(new_freq) @property @cached('__cache__') def key(self) -> str: apikey = self.config['Cloudflare']['api-key'].value key = self._ck if key is None: raise AttributeError('Key was not created! Unexpected failure') try: return decrypt(apikey.encode(), key).decode() except InvalidToken: self.key = apikey return apikey @key.setter def key(self, new_key: str): if new_key is None: raise ValueError("API key must be provided!") key = self._ck if key is None: raise AttributeError('Key was not created! Unexpected failure') if not is_valid_token(new_key.encode(), key): new_key = encrypt(new_key.encode(), key).decode() self.config['Cloudflare']['api-key'].value = new_key @property @cached('__cache__') def mail(self) -> str: return self.config['Cloudflare']['mail'].value @mail.setter @ucached('__cache__') def mail(self, new_mail: str): if new_mail is None: raise ValueError("Mail must be provided!") self.config['Cloudflare']['mail'].value = new_mail @property @cached('__cache__') def use_proxy(self) -> bool: return bool(self.config['Cloudflare']['use-proxy'].value) @use_proxy.setter @ucached('__cache__') def use_proxy(self, use: bool): if use is None: raise ValueError("Whether to use a proxy or not must be specified!") self.config['Cloudflare']['use-proxy'].value = str(use) @property @cached('__cache__') def logging_file(self) -> str: return self.config['Logging']['file'].value @logging_file.setter @ucached('__cache__') def logging_file(self, file: str): if file is None: warnings.warn("No data will be logged to any file!") self.config['Logging']['file'].value = file @property @cached('__cache__') def logging_level(self) -> int: return logging.getLevelName(self.config['Logging']['level'].value) @logging_level.setter def logging_level(self, level: Union[int, str]): if isinstance(level, str): level = logging.getLevelName(level) if level not in VALID_LOGGING_LEVELS: raise ValueError("Logging level is not valid!") self.config['Logging']['level'].value = logging.getLevelName(level) @property @cached('__cache__') def pid_file(self) -> str: return self.config['Service']['pid-file'].value @pid_file.setter @ucached('__cache__') def pid_file(self, file: str): if file is None: raise ValueError("PID file must be provided!") self.config['Service']['pid-file'].value = file
[docs] async def reload(self): self.__cache__ = {} self._check_perms() self.config.read(self.file)
[docs] async def save_async(self): self.save()
[docs] def save(self): self._check_perms() with open(self.file, 'w') as configfile: self.config.write(configfile)
[docs] @staticmethod async def create_empty_file() -> bool: if not os.path.exists(Preferences.file): config_dir = os.path.dirname(Preferences.file) if not os.path.exists(config_dir): os.makedirs(config_dir, mode=0o700, exist_ok=True) with open(Preferences.file, 'w') as configfile: configfile.write(DEFAULT_SETTINGS) if not ensure_permissions(Preferences.file, 0o700): change_permissions(Preferences.file, 0o700) return True return False