From 988ab003ad05fdd7e39d6daeefe7af551339fc78 Mon Sep 17 00:00:00 2001 From: Robin Date: Mon, 6 Oct 2025 00:31:54 +0200 Subject: [PATCH] + Fixxed the SQLLite Error Start and Basic works --- .gitignore | 1 + main.py | 377 +++++++++++++++++++++++++++++++++++------------------ 2 files changed, 248 insertions(+), 130 deletions(-) diff --git a/.gitignore b/.gitignore index 973e1fb..121fb76 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ env/ # IDEs .idea/ .vscode/ +uptime.db \ No newline at end of file diff --git a/main.py b/main.py index 8c8356d..7e86eef 100644 --- a/main.py +++ b/main.py @@ -1,39 +1,69 @@ - -import warnings -warnings.filterwarnings("ignore", category=UserWarning, module='flask_admin.contrib') - import os import requests -import threading -import time import socket +import threading +import logging +import time +import warnings from datetime import datetime +from typing import Optional, Tuple +from urllib.parse import urlparse + from flask import Flask, render_template, flash, redirect, url_for, request -from flask_sqlalchemy import SQLAlchemy from flask_admin import Admin, AdminIndexView from flask_admin.contrib.sqla import ModelView from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required -from flask_migrate import Migrate, upgrade as upgrade_database -from werkzeug.security import generate_password_hash, check_password_hash +from flask_migrate import Migrate, upgrade +from flask_sqlalchemy import SQLAlchemy from flask_wtf import FlaskForm +from sqlalchemy import func, inspect +from werkzeug.security import generate_password_hash, check_password_hash from wtforms import StringField, PasswordField, SubmitField, SelectField, TextAreaField -from wtforms.validators import DataRequired, EqualTo +from wtforms.validators import DataRequired, EqualTo, URL, Optional as VOptional -# --- Initialisierung --- +# --- Konfiguration --- +# Unterdrücke spezifische Warnungen von flask_admin mit neueren SQLAlchemy-Versionen +warnings.filterwarnings("ignore", category=UserWarning, module='flask_admin.contrib') basedir = os.path.abspath(os.path.dirname(__file__)) + +class Config: + """Bündelt die Konfiguration der Anwendung.""" + # Für Produktion per ENV setzen! + SECRET_KEY = os.environ.get('SECRET_KEY', 'ein-sehr-unsicherer-dev-key') + + SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \ + 'sqlite:///' + os.path.join(basedir, 'uptime.db') + SQLALCHEMY_TRACK_MODIFICATIONS = False + + # Konfiguration für den initialen Admin-Benutzer + INITIAL_ADMIN_USER = os.environ.get('ADMIN_USER', 'admin') + INITIAL_ADMIN_PASS = os.environ.get('ADMIN_PASS', 'admin123') + + # Checker-Intervall (Sekunden) + CHECK_INTERVAL = int(os.environ.get('CHECK_INTERVAL', '300')) + + # HTTP Timeout (Sekunden) + HTTP_TIMEOUT = int(os.environ.get('HTTP_TIMEOUT', '10')) + + +# --- App-Initialisierung --- + app = Flask(__name__) -app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(basedir, 'uptime.db') -app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False -app.config['SECRET_KEY'] = 'eine-viel-sicherere-geheime-zeichenkette' # In Produktion ändern! +app.config.from_object(Config) + +# Sicherstellen, dass SECRET_KEY gesetzt ist (Warnung nur in Dev) +if app.config['SECRET_KEY'] == 'ein-sehr-unsicherer-dev-key': + app.logger.warning("Unsicherer SECRET_KEY aktiv – bitte in Produktion per Umgebungsvariable setzen!") + db = SQLAlchemy(app) +migrate = Migrate(app, db) login_manager = LoginManager(app) login_manager.login_view = 'login' login_manager.login_message = 'Bitte loggen Sie sich ein, um auf diese Seite zuzugreifen.' -migrate = Migrate(app, db) # --- Datenbankmodelle --- @@ -43,15 +73,16 @@ class User(UserMixin, db.Model): username = db.Column(db.String(80), unique=True, nullable=False) password_hash = db.Column(db.String(200), nullable=False) - def set_password(self, password): + def set_password(self, password: str): self.password_hash = generate_password_hash(password) - def check_password(self, password): + def check_password(self, password: str) -> bool: return check_password_hash(self.password_hash, password) def __repr__(self): return f'' + class Monitor(db.Model): """Modell für einen zu überwachenden Dienst.""" id = db.Column(db.Integer, primary_key=True) @@ -62,28 +93,31 @@ class Monitor(db.Model): port = db.Column(db.Integer, nullable=True) status_override = db.Column(db.String(50), nullable=True) status_override_message = db.Column(db.Text, nullable=True) - + logs = db.relationship('UptimeLog', backref='monitor', lazy=True, cascade="all, delete-orphan") def __repr__(self): return f'' + class UptimeLog(db.Model): """Modell zum Protokollieren des Uptime-Status.""" id = db.Column(db.Integer, primary_key=True) monitor_id = db.Column(db.Integer, db.ForeignKey('monitor.id'), nullable=False) - timestamp = db.Column(db.DateTime, default=datetime.utcnow) + timestamp = db.Column(db.DateTime, default=datetime.utcnow, index=True) status_code = db.Column(db.Integer, nullable=True) is_up = db.Column(db.Boolean, nullable=False) def __repr__(self): return f'' + # --- Login-Management --- @login_manager.user_loader -def load_user(user_id): - return User.query.get(int(user_id)) +def load_user(user_id: int) -> Optional[User]: + return db.session.get(User, int(user_id)) + # --- Formulare --- @@ -92,68 +126,95 @@ class LoginForm(FlaskForm): password = PasswordField('Passwort', validators=[DataRequired()]) submit = SubmitField('Einloggen') + class ChangePasswordForm(FlaskForm): current_password = PasswordField('Aktuelles Passwort', validators=[DataRequired()]) new_password = PasswordField('Neues Passwort', validators=[DataRequired()]) - confirm_password = PasswordField('Neues Passwort bestätigen', validators=[DataRequired(), EqualTo('new_password', 'Die Passwörter müssen übereinstimmen.')]) + confirm_password = PasswordField( + 'Neues Passwort bestätigen', + validators=[DataRequired(), EqualTo('new_password', 'Die Passwörter müssen übereinstimmen.')] + ) submit = SubmitField('Passwort ändern') + # --- Gesicherter Admin-Bereich --- -class SecureModelView(ModelView): - def is_accessible(self): +class SecureView: + """Mixin-Klasse zur Absicherung von Admin-Ansichten.""" + def is_accessible(self) -> bool: return current_user.is_authenticated def inaccessible_callback(self, name, **kwargs): return redirect(url_for('login', next=request.url)) -class SecureAdminIndexView(AdminIndexView): - def is_accessible(self): - return current_user.is_authenticated - def inaccessible_callback(self, name, **kwargs): - return redirect(url_for('login', next=request.url)) +class SecureModelView(SecureView, ModelView): + pass + + +class SecureAdminIndexView(SecureView, AdminIndexView): + pass + class MonitorModelView(SecureModelView): - # Dropdown für den Monitor-Typ und manuellen Status + """Angepasste Admin-Ansicht für Monitore.""" form_overrides = { 'monitor_type': SelectField, 'status_override': SelectField, - 'status_override_message': TextAreaField + 'status_override_message': TextAreaField, } form_args = { 'monitor_type': { 'label': 'Monitor-Typ', - 'choices': [ - ('HTTP', 'HTTP(s)'), - ('KEYWORD', 'HTTP(s) mit Keyword'), - ('TCP', 'TCP-Port') - ] + 'choices': [('HTTP', 'HTTP(s)'), ('KEYWORD', 'HTTP(s) mit Keyword'), ('TCP', 'TCP-Port')], + 'coerce': str, }, 'status_override': { 'label': 'Manueller Status', - 'choices': [ - ('', 'Automatisch'), # Leerer String für None - ('MAINTENANCE', 'Wartungsarbeiten'), - ('DEGRADED', 'Leistungsprobleme'), - ('OPERATIONAL', 'Funktionsfähig (Manuell)') - ] - } + 'choices': [('', 'Automatisch'), ('MAINTENANCE', 'Wartung'), ('DEGRADED', 'Leistungsprobleme')], + 'coerce': str, + 'validators': [VOptional()], + }, + 'url': { + 'label': 'URL/Host', + 'validators': [DataRequired()], + }, + 'port': { + 'label': 'Port', + 'validators': [VOptional()], + }, + 'keyword': { + 'label': 'Keyword (optional)', + 'validators': [VOptional()], + }, } column_list = ('name', 'monitor_type', 'status_override', 'url', 'port') form_columns = ('name', 'monitor_type', 'url', 'port', 'keyword', 'status_override', 'status_override_message') - column_labels = dict(name='Name', monitor_type='Typ', url='URL/Host', port='Port', keyword='Keyword', status_override='Manueller Status', status_override_message='Status-Nachricht') + column_labels = dict( + name='Name', monitor_type='Typ', url='URL/Host', port='Port', keyword='Keyword', + status_override='Manueller Status', status_override_message='Status-Nachricht' + ) def on_model_change(self, form, model, is_created): - if is_created: - print(f"Neuer Monitor erstellt: {model.name}. Starte initiale Prüfung.") - checker_thread = threading.Thread(target=check_monitors, kwargs={'monitor_id': model.id}) - checker_thread.start() + # Validierung basierend auf Typ + if model.monitor_type in ('HTTP', 'KEYWORD'): + # Schema ergänzen, falls nicht vorhanden + parsed = urlparse(model.url) + if not parsed.scheme: + model.url = f"http://{model.url}" + elif model.monitor_type == 'TCP': + if not model.port: + raise ValueError('Für TCP-Monitore muss ein Port angegeben werden.') super().on_model_change(form, model, is_created) + if is_created: + app.logger.info(f"Neuer Monitor erstellt: {model.name}. Starte initiale Prüfung.") + t = threading.Thread(target=check_monitors, kwargs={'monitor_id': model.id}, daemon=True) + t.start() + admin = Admin(app, name='Uptime Stats Admin', template_mode='bootstrap4', index_view=SecureAdminIndexView()) -admin.add_view(MonitorModelView(Monitor, db.session, name="Monitore verwalten")) +admin.add_view(MonitorModelView(Monitor, db.session, name="Monitore")) admin.add_view(SecureModelView(User, db.session, name="Administratoren", endpoint='user_admin')) @@ -161,38 +222,42 @@ admin.add_view(SecureModelView(User, db.session, name="Administratoren", endpoin @app.route('/') def index(): - monitors_with_status = [] - monitors = Monitor.query.order_by(Monitor.name).all() - for monitor in monitors: - last_log = UptimeLog.query.filter_by(monitor_id=monitor.id).order_by(UptimeLog.timestamp.desc()).first() - monitors_with_status.append({ - 'name': monitor.name, - 'url': monitor.url, - 'monitor_type': monitor.monitor_type, - 'keyword': monitor.keyword, - 'port': monitor.port, - 'status_override': monitor.status_override, - 'status_override_message': monitor.status_override_message, - 'is_up': last_log.is_up if last_log else None, - 'last_checked': last_log.timestamp if last_log else 'Nie' - }) - return render_template('index.html', monitors=monitors_with_status) + """Öffentliche Statusseite.""" + latest_log_subquery = db.session.query( + UptimeLog.monitor_id, + func.max(UptimeLog.timestamp).label('max_timestamp') + ).group_by(UptimeLog.monitor_id).subquery() + + monitors_with_status = db.session.query( + Monitor, + UptimeLog + ).outerjoin( + latest_log_subquery, Monitor.id == latest_log_subquery.c.monitor_id + ).outerjoin( + UptimeLog, + (UptimeLog.monitor_id == latest_log_subquery.c.monitor_id) & + (UptimeLog.timestamp == latest_log_subquery.c.max_timestamp) + ).order_by(Monitor.name).all() + + return render_template('index.html', monitors_with_status=monitors_with_status) + @app.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: - return redirect(url_for('index')) + return redirect(url_for('admin.index')) form = LoginForm() if form.validate_on_submit(): - user = User.query.filter_by(username=form.username.data).first() - if user is None or not user.check_password(form.password.data): - flash('Ungültiger Benutzername oder Passwort.', 'danger') - return redirect(url_for('login')) - login_user(user) - flash('Erfolgreich eingeloggt.', 'success') - return redirect(url_for('admin.index')) + user = db.session.scalar(db.select(User).where(User.username == form.username.data)) + if user and user.check_password(form.password.data): + login_user(user) + flash('Erfolgreich eingeloggt.', 'success') + next_page = request.args.get('next') + return redirect(next_page or url_for('admin.index')) + flash('Ungültiger Benutzername oder Passwort.', 'danger') return render_template('login.html', form=form) + @app.route('/logout') @login_required def logout(): @@ -200,6 +265,7 @@ def logout(): flash('Erfolgreich ausgeloggt.', 'info') return redirect(url_for('index')) + @app.route('/change-password', methods=['GET', 'POST']) @login_required def change_password(): @@ -211,92 +277,143 @@ def change_password(): current_user.set_password(form.new_password.data) db.session.commit() flash('Ihr Passwort wurde erfolgreich geändert.', 'success') - return redirect(url_for('index')) + return redirect(url_for('admin.index')) return render_template('change_password.html', form=form) + # --- Uptime-Checker (Hintergrundprozess) --- -def check_monitors(monitor_id=None): +def _perform_check(monitor: Monitor) -> Tuple[bool, Optional[int]]: + """Führt die eigentliche Prüfung für einen einzelnen Monitor durch.""" + is_up, status_code = False, None + try: + if monitor.monitor_type in ['HTTP', 'KEYWORD']: + # Sicherstellen, dass eine URL mit Schema verwendet wird + url = monitor.url + parsed = urlparse(url) + if not parsed.scheme: + url = f"http://{url}" + + response = requests.get(url, timeout=app.config['HTTP_TIMEOUT']) + status_code = response.status_code + if 200 <= status_code < 400: + if monitor.monitor_type == 'KEYWORD' and monitor.keyword: + # Case-insensitive Keyword-Suche + is_up = monitor.keyword.lower() in response.text.lower() + else: + is_up = True + elif monitor.monitor_type == 'TCP': + # host kann evtl. eine URL sein – Hostname extrahieren + host = monitor.url + parsed = urlparse(host) + if parsed.hostname: + host = parsed.hostname + port = monitor.port + if host and port: + with socket.create_connection((host, int(port)), timeout=app.config['HTTP_TIMEOUT']): + is_up = True + except requests.RequestException as e: + app.logger.warning(f"Fehler bei HTTP-Prüfung für '{monitor.name}': {e}") + except (socket.timeout, socket.error, OSError) as e: + app.logger.warning(f"Fehler bei TCP-Prüfung für '{monitor.name}': {e}") + except Exception as e: + app.logger.exception(f"Unerwarteter Fehler bei Prüfung für '{monitor.name}': {e}") + + return is_up, status_code + + +def check_monitors(monitor_id: Optional[int] = None): """Überprüft den Status. Wenn monitor_id angegeben ist, nur diesen, sonst alle.""" with app.app_context(): + query = db.select(Monitor) if monitor_id: - monitors = Monitor.query.filter_by(id=monitor_id).all() - print(f"[{datetime.now()}] Starte initiale Prüfung für Monitor ID {monitor_id}...") + query = query.where(Monitor.id == monitor_id) + app.logger.info(f"[{datetime.now()}] Starte Prüfung für Monitor ID {monitor_id}...") else: - monitors = Monitor.query.all() - if not monitors: - return - print(f"[{datetime.now()}] Starte periodische Überprüfung für {len(monitors)} Monitor(en)...") + app.logger.info(f"[{datetime.now()}] Starte periodische Überprüfung...") + monitors = db.session.scalars(query).all() if not monitors: + if not monitor_id: + app.logger.info(f"[{datetime.now()}] Keine Monitore zur Überprüfung gefunden.") return + for monitor in monitors: - # Wenn ein manueller Status gesetzt ist, überspringe die automatische Prüfung. if monitor.status_override: - print(f" - Monitor '{monitor.name}' hat manuellen Status '{monitor.status_override}'. Überspringe Prüfung.") + app.logger.info(f" - '{monitor.name}' hat manuellen Status '{monitor.status_override}'. Überspringe.") continue - is_up, status_code = False, None - - if monitor.monitor_type in ['HTTP', 'KEYWORD']: - try: - response = requests.get(monitor.url, timeout=10) - status_code = response.status_code - if 200 <= status_code < 400: - if monitor.monitor_type == 'KEYWORD' and monitor.keyword: - if monitor.keyword in response.text: - is_up = True - else: # Standard HTTP - is_up = True - except requests.RequestException: - pass - - elif monitor.monitor_type == 'TCP': - try: - # For TCP, the 'url' field holds the host - host = monitor.url - port = monitor.port - if host and port: - with socket.create_connection((host, port), timeout=10) as sock: - is_up = True - # status_code is not applicable for TCP checks - except (socket.timeout, socket.error, OSError): - pass + is_up, status_code = _perform_check(monitor) log_entry = UptimeLog(monitor_id=monitor.id, status_code=status_code, is_up=is_up) db.session.add(log_entry) - + db.session.commit() - print(f"[{datetime.now()}] Überprüfung abgeschlossen.") + app.logger.info(f"[{datetime.now()}] Überprüfung abgeschlossen.") -def run_checks_periodically(): +def run_checks_periodically(interval: int = None): """Führt die Überprüfungen in regelmäßigen Abständen aus.""" + if interval is None: + interval = app.config['CHECK_INTERVAL'] + while True: - check_monitors() - # Warte 5 Minuten (300 Sekunden) bis zur nächsten Überprüfung - time.sleep(300) + try: + check_monitors() + except Exception: + app.logger.exception("Fehler in run_checks_periodically – Schleife läuft weiter.") + time.sleep(interval) -# --- Initialisierung der App --- -def create_initial_user(): +# --- App-Start und Initialisierung --- + +def init_app(): + """Führt einmalige Initialisierungsaufgaben aus: Migration und User-Erstellung.""" + # Logging konfigurieren (zeigt auch Alembic) + logging.basicConfig(level=logging.INFO) + logging.getLogger('alembic').setLevel(logging.INFO) + with app.app_context(): - # db.create_all() is now handled by migrations - if User.query.first() is None: - print("Erstelle initialen Admin-Benutzer...") - initial_user = User(username='admin') - initial_user.set_password('admin123') + app.logger.info("Führe Datenbank-Migrationen aus...") + migrations_dir = os.path.join(basedir, 'migrations') + ran_migrations = False + try: + if os.path.isdir(migrations_dir): + upgrade() + ran_migrations = True + app.logger.info("Migrationen abgeschlossen.") + else: + app.logger.warning("Kein migrations/-Ordner gefunden – überspringe Alembic upgrade.") + except Exception as e: + app.logger.warning(f"Alembic upgrade fehlgeschlagen: {e}") + + # Sicherstellen, dass alle Tabellen existieren – Fallback für frische Setups ohne Migrations + insp = inspect(db.engine) + existing_tables = set(insp.get_table_names()) + required_tables = {User.__table__.name, Monitor.__table__.name, UptimeLog.__table__.name} + if not required_tables.issubset(existing_tables): + app.logger.info("Lege fehlende Tabellen via create_all() an…") + db.create_all() + + # Initialen Admin-Benutzer erstellen, falls keiner existiert. + if not db.session.scalar(db.select(User).limit(1)): + app.logger.info("Keine Benutzer gefunden. Erstelle initialen Admin-Benutzer…") + admin_user = app.config['INITIAL_ADMIN_USER'] + admin_pass = app.config['INITIAL_ADMIN_PASS'] + + initial_user = User(username=admin_user) + initial_user.set_password(admin_pass) db.session.add(initial_user) db.session.commit() - print("Benutzer 'admin' mit Passwort 'admin123' erstellt.") + app.logger.info(f"Benutzer '{admin_user}' mit dem konfigurierten Passwort erstellt.") + if __name__ == '__main__': - # Apply database migrations automatically - with app.app_context(): - upgrade_database() + init_app() - create_initial_user() - + # Hintergrund-Thread für periodische Prüfungen (Daemon, damit Prozess sauber beendet) checker_thread = threading.Thread(target=run_checks_periodically, daemon=True) checker_thread.start() - - app.run(host='0.0.0.0', port=5000, debug=True) + + # Flask-Entwicklungsserver starten – use_reloader=False verhindert doppeltes Starten des Threads + # debug=True sollte in Produktion deaktiviert werden. + app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False)