import os import requests import socket import threading import logging import time, math import warnings from datetime import datetime, timezone, timedelta from typing import Optional, Tuple from urllib.parse import urlparse from flask import Flask, render_template, flash, redirect, url_for, request, make_response from flask_admin import Admin, AdminIndexView, BaseView, expose from flask_admin.contrib.sqla import ModelView from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required from flask_admin.menu import MenuLink from flask_migrate import Migrate, upgrade from flask_sqlalchemy import SQLAlchemy from flask_wtf import FlaskForm from sqlalchemy import func, inspect from werkzeug.security import generate_password_hash, check_password_hash from wtforms import StringField, PasswordField, SubmitField, SelectField, TextAreaField from wtforms.validators import DataRequired, EqualTo, URL, Optional as VOptional # --- Konfiguration --- # Unterdrücke spezifische Warnungen von flask_admin mit neueren SQLAlchemy-Versionen warnings.filterwarnings("ignore", category=UserWarning, module='flask_admin.contrib') basedir = os.path.abspath(os.path.dirname(__file__)) class Config: """Bündelt die Konfiguration der Anwendung.""" # Für Produktion per ENV setzen! SECRET_KEY = os.environ.get('SECRET_KEY', 'ein-sehr-unsicherer-dev-key') SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \ 'sqlite:///' + os.path.join(basedir, 'uptime.db') SQLALCHEMY_TRACK_MODIFICATIONS = False # Konfiguration für den initialen Admin-Benutzer INITIAL_ADMIN_USER = os.environ.get('ADMIN_USER', 'admin') INITIAL_ADMIN_PASS = os.environ.get('ADMIN_PASS', 'admin123') # Checker-Intervall (Sekunden) CHECK_INTERVAL = int(os.environ.get('CHECK_INTERVAL', '300')) # HTTP Timeout (Sekunden) HTTP_TIMEOUT = int(os.environ.get('HTTP_TIMEOUT', '10')) # --- App-Initialisierung --- app = Flask(__name__) app.config.from_object(Config) # Sicherstellen, dass SECRET_KEY gesetzt ist (Warnung nur in Dev) if app.config['SECRET_KEY'] == 'ein-sehr-unsicherer-dev-key': app.logger.warning("Unsicherer SECRET_KEY aktiv – bitte in Produktion per Umgebungsvariable setzen!") db = SQLAlchemy(app) migrate = Migrate(app, db) login_manager = LoginManager(app) login_manager.login_view = 'login' login_manager.login_message = 'Bitte loggen Sie sich ein, um auf diese Seite zuzugreifen.' # --- Datenbankmodelle --- class User(UserMixin, db.Model): """Modell für Benutzer.""" id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) password_hash = db.Column(db.String(200), nullable=False) def set_password(self, password: str): self.password_hash = generate_password_hash(password) def check_password(self, password: str) -> bool: return check_password_hash(self.password_hash, password) def __repr__(self): return f'' class Monitor(db.Model): """Modell für einen zu überwachenden Dienst.""" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), nullable=False, unique=True) monitor_type = db.Column(db.String(20), nullable=False, default='HTTP') url = db.Column(db.String(200), nullable=False) keyword = db.Column(db.String(100), nullable=True) port = db.Column(db.Integer, nullable=True) position = db.Column(db.Integer, nullable=False, default=0, server_default='0') status_override = db.Column(db.String(50), nullable=True) status_override_message = db.Column(db.Text, nullable=True) logs = db.relationship('UptimeLog', backref='monitor', lazy=True, cascade="all, delete-orphan") def __repr__(self): return f'' class UptimeLog(db.Model): """Modell zum Protokollieren des Uptime-Status.""" id = db.Column(db.Integer, primary_key=True) monitor_id = db.Column(db.Integer, db.ForeignKey('monitor.id'), nullable=False) timestamp = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc), index=True) status_code = db.Column(db.Integer, nullable=True) is_up = db.Column(db.Boolean, nullable=False) def __repr__(self): return f'' # --- Login-Management --- @login_manager.user_loader def load_user(user_id: int) -> Optional[User]: return db.session.get(User, int(user_id)) # --- Formulare --- class LoginForm(FlaskForm): username = StringField('Benutzername', validators=[DataRequired()]) password = PasswordField('Passwort', validators=[DataRequired()]) submit = SubmitField('Einloggen') class ChangePasswordForm(FlaskForm): current_password = PasswordField('Aktuelles Passwort', validators=[DataRequired()]) new_password = PasswordField('Neues Passwort', validators=[DataRequired()]) confirm_password = PasswordField( 'Neues Passwort bestätigen', validators=[DataRequired(), EqualTo('new_password', 'Die Passwörter müssen übereinstimmen.')] ) submit = SubmitField('Passwort ändern') # --- Gesicherter Admin-Bereich --- class SecureView: """Mixin-Klasse zur Absicherung von Admin-Ansichten.""" def is_accessible(self) -> bool: return current_user.is_authenticated def inaccessible_callback(self, name, **kwargs): return redirect(url_for('login', next=request.url)) class SecureModelView(SecureView, ModelView): pass class SecureAdminIndexView(SecureView, AdminIndexView): pass class EmbedInfoView(SecureView, BaseView): """Eine gesicherte Admin-Ansicht, die den iFrame-Einbettungscode anzeigt.""" @expose('/') def index(self): # 'render' wird von BaseView bereitgestellt und injiziert den Admin-Kontext return self.render('admin/embed_info.html') class MonitorModelView(SecureModelView): """Angepasste Admin-Ansicht für Monitore.""" form_overrides = { 'monitor_type': SelectField, 'status_override': SelectField, 'status_override_message': TextAreaField, } form_args = { 'monitor_type': { 'label': 'Monitor-Typ', 'choices': [('HTTP', 'HTTP(s)'), ('KEYWORD', 'HTTP(s) mit Keyword'), ('TCP', 'TCP-Port')], 'coerce': str, }, 'status_override': { 'label': 'Manueller Status', 'choices': [('', 'Automatisch'), ('MAINTENANCE', 'Wartung'), ('DEGRADED', 'Leistungsprobleme')], 'coerce': str, 'validators': [VOptional()], }, 'url': { 'label': 'URL/Host', 'validators': [DataRequired()], }, 'port': { 'label': 'Port', 'validators': [VOptional()], }, 'keyword': { 'label': 'Keyword (optional)', 'validators': [VOptional()], }, } column_list = ('name', 'monitor_type', 'position', 'status_override', 'url', 'port') form_columns = ('name', 'monitor_type', 'position', 'url', 'port', 'keyword', 'status_override', 'status_override_message') column_labels = dict( name='Name', monitor_type='Typ', url='URL/Host', port='Port', keyword='Keyword', position='Position', status_override='Manueller Status', status_override_message='Status-Nachricht' ) def on_model_change(self, form, model, is_created): # Validierung basierend auf Typ if model.monitor_type in ('HTTP', 'KEYWORD'): # Schema ergänzen, falls nicht vorhanden parsed = urlparse(model.url) if not parsed.scheme: model.url = f"http://{model.url}" elif model.monitor_type == 'TCP': if not model.port: raise ValueError('Für TCP-Monitore muss ein Port angegeben werden.') super().on_model_change(form, model, is_created) if is_created: app.logger.info(f"Neuer Monitor erstellt: {model.name}. Starte initiale Prüfung.") t = threading.Thread(target=check_monitors, kwargs={'monitor_id': model.id}, daemon=True) t.start() admin = Admin(app, name='Uptime Stats Admin', template_mode='bootstrap4', index_view=SecureAdminIndexView()) admin.add_view(MonitorModelView(Monitor, db.session, name="Monitore")) admin.add_view(SecureModelView(User, db.session, name="Administratoren", endpoint='user_admin')) admin.add_view(EmbedInfoView(name='Einbettungs-Info', endpoint='embed-info')) # --- Web-Routen --- @app.context_processor def inject_now(): """Stellt das aktuelle UTC-Datum für alle Templates zur Verfügung.""" return {'now': datetime.now(timezone.utc)} @app.route('/') def index(): """Öffentliche Statusseite.""" latest_log_subquery = db.session.query( UptimeLog.monitor_id, func.max(UptimeLog.timestamp).label('max_timestamp') ).group_by(UptimeLog.monitor_id).subquery() monitors_with_status = db.session.query( Monitor, UptimeLog ).outerjoin( latest_log_subquery, Monitor.id == latest_log_subquery.c.monitor_id ).outerjoin( UptimeLog, (UptimeLog.monitor_id == latest_log_subquery.c.monitor_id) & (UptimeLog.timestamp == latest_log_subquery.c.max_timestamp) ).order_by(Monitor.position, Monitor.name).all() return render_template('index.html', monitors_with_status=monitors_with_status) @app.route('/status') def status(): """Erstellt eine für iFrames optimierte Status-Tabellenseite.""" latest_log_subquery = db.session.query( UptimeLog.monitor_id, func.max(UptimeLog.timestamp).label('max_timestamp') ).group_by(UptimeLog.monitor_id).subquery() monitors_with_status = db.session.query( Monitor, UptimeLog ).outerjoin( latest_log_subquery, Monitor.id == latest_log_subquery.c.monitor_id ).outerjoin( UptimeLog, (UptimeLog.monitor_id == latest_log_subquery.c.monitor_id) & (UptimeLog.timestamp == latest_log_subquery.c.max_timestamp) ).order_by(Monitor.position, Monitor.name).all() response = make_response(render_template('status.html', monitors_with_status=monitors_with_status)) response.headers.pop('X-Frame-Options', None) return response @app.route('/monitor/') def monitor_details(monitor_id: int): """Zeigt Detailinformationen und Statistiken für einen einzelnen Monitor.""" monitor = db.session.get(Monitor, monitor_id) if not monitor: return "Monitor nicht gefunden", 404 now = datetime.now(timezone.utc) # Statistik für 24 Stunden start_24h = now - timedelta(hours=24) logs_24h = db.session.scalars( db.select(UptimeLog).where( UptimeLog.monitor_id == monitor_id, UptimeLog.timestamp >= start_24h ) ).all() total_checks_24h = len(logs_24h) up_checks_24h = sum(1 for log in logs_24h if log.is_up) uptime_24h = (up_checks_24h / total_checks_24h * 100) if total_checks_24h > 0 else 100 # Statistik für 30 Tage start_30d = now - timedelta(days=30) logs_30d = db.session.scalars( db.select(UptimeLog).where( UptimeLog.monitor_id == monitor_id, UptimeLog.timestamp >= start_30d ) ).all() total_checks_30d = len(logs_30d) up_checks_30d = sum(1 for log in logs_30d if log.is_up) uptime_30d = (up_checks_30d / total_checks_30d * 100) if total_checks_30d > 0 else 100 return render_template('monitor_details.html', monitor=monitor, uptime_24h=uptime_24h, uptime_30d=uptime_30d, total_checks_24h=total_checks_24h, total_checks_30d=total_checks_30d) @app.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: return redirect(url_for('admin.index')) form = LoginForm() if form.validate_on_submit(): user = db.session.scalar(db.select(User).where(User.username == form.username.data)) if user and user.check_password(form.password.data): login_user(user) flash('Erfolgreich eingeloggt.', 'success') next_page = request.args.get('next') return redirect(next_page or url_for('admin.index')) flash('Ungültiger Benutzername oder Passwort.', 'danger') return render_template('login.html', form=form) @app.route('/logout') @login_required def logout(): logout_user() flash('Erfolgreich ausgeloggt.', 'info') return redirect(url_for('index')) @app.route('/change-password', methods=['GET', 'POST']) @login_required def change_password(): form = ChangePasswordForm() if form.validate_on_submit(): if not current_user.check_password(form.current_password.data): flash('Das aktuelle Passwort ist nicht korrekt.', 'danger') else: current_user.set_password(form.new_password.data) db.session.commit() flash('Ihr Passwort wurde erfolgreich geändert.', 'success') return redirect(url_for('admin.index')) return render_template('change_password.html', form=form) # --- Uptime-Checker (Hintergrundprozess) --- def _perform_check(monitor: Monitor) -> Tuple[bool, Optional[int]]: """Führt die eigentliche Prüfung für einen einzelnen Monitor durch.""" is_up, status_code = False, None try: if monitor.monitor_type in ['HTTP', 'KEYWORD']: # Sicherstellen, dass eine URL mit Schema verwendet wird url = monitor.url parsed = urlparse(url) if not parsed.scheme: url = f"http://{url}" response = requests.get(url, timeout=app.config['HTTP_TIMEOUT']) status_code = response.status_code if 200 <= status_code < 400: if monitor.monitor_type == 'KEYWORD' and monitor.keyword: # Case-insensitive Keyword-Suche is_up = monitor.keyword.lower() in response.text.lower() else: is_up = True elif monitor.monitor_type == 'TCP': # host kann evtl. eine URL sein – Hostname extrahieren host = monitor.url parsed = urlparse(host) if parsed.hostname: host = parsed.hostname port = monitor.port if host and port: with socket.create_connection((host, int(port)), timeout=app.config['HTTP_TIMEOUT']): is_up = True except requests.RequestException as e: app.logger.warning(f"Fehler bei HTTP-Prüfung für '{monitor.name}': {e}") except (socket.timeout, socket.error, OSError) as e: app.logger.warning(f"Fehler bei TCP-Prüfung für '{monitor.name}': {e}") except Exception as e: app.logger.exception(f"Unerwarteter Fehler bei Prüfung für '{monitor.name}': {e}") return is_up, status_code def check_monitors(monitor_id: Optional[int] = None): """Überprüft den Status. Wenn monitor_id angegeben ist, nur diesen, sonst alle.""" with app.app_context(): query = db.select(Monitor) if monitor_id: query = query.where(Monitor.id == monitor_id) app.logger.info(f"[{datetime.now(timezone.utc)}] Starte Prüfung für Monitor ID {monitor_id}...") else: app.logger.info(f"[{datetime.now(timezone.utc)}] Starte periodische Überprüfung...") monitors = db.session.scalars(query).all() if not monitors: if not monitor_id: app.logger.info(f"[{datetime.now(timezone.utc)}] Keine Monitore zur Überprüfung gefunden.") return for monitor in monitors: if monitor.status_override: app.logger.info(f" - '{monitor.name}' hat manuellen Status '{monitor.status_override}'. Überspringe.") continue is_up, status_code = _perform_check(monitor) log_entry = UptimeLog(monitor_id=monitor.id, status_code=status_code, is_up=is_up) db.session.add(log_entry) db.session.commit() app.logger.info(f"[{datetime.now(timezone.utc)}] Überprüfung abgeschlossen.") def run_checks_periodically(interval: int = None): """Führt die Überprüfungen in regelmäßigen Abständen aus.""" if interval is None: interval = app.config['CHECK_INTERVAL'] while True: try: check_monitors() except Exception: app.logger.exception("Fehler in run_checks_periodically – Schleife läuft weiter.") time.sleep(interval) # --- App-Start und Initialisierung --- def init_app(): """Führt einmalige Initialisierungsaufgaben aus: Migration und User-Erstellung.""" # Logging konfigurieren (zeigt auch Alembic) logging.basicConfig(level=logging.INFO) logging.getLogger('alembic').setLevel(logging.INFO) with app.app_context(): app.logger.info("Führe Datenbank-Migrationen aus...") try: # Wendet ausstehende Datenbank-Migrationen an. # Dies aktualisiert das Schema auf den neuesten Stand. upgrade() app.logger.info("Datenbank-Migrationen erfolgreich angewendet.") except Exception as e: app.logger.error(f"Anwenden der Datenbank-Migrationen fehlgeschlagen: {e}") app.logger.info("Versuche, Tabellen mit db.create_all() zu erstellen, falls sie nicht existieren.") # Sicherstellen, dass alle Tabellen existieren – Fallback für frische Setups ohne Migrations insp = inspect(db.engine) existing_tables = set(insp.get_table_names()) required_tables = {User.__table__.name, Monitor.__table__.name, UptimeLog.__table__.name} if not required_tables.issubset(existing_tables): app.logger.info("Lege fehlende Tabellen via create_all() an…") db.create_all() # Initialen Admin-Benutzer erstellen, falls keiner existiert. if not db.session.scalar(db.select(User).limit(1)): app.logger.info("Keine Benutzer gefunden. Erstelle initialen Admin-Benutzer…") admin_user = app.config['INITIAL_ADMIN_USER'] admin_pass = app.config['INITIAL_ADMIN_PASS'] initial_user = User(username=admin_user) initial_user.set_password(admin_pass) db.session.add(initial_user) db.session.commit() app.logger.info(f"Benutzer '{admin_user}' mit dem konfigurierten Passwort erstellt.") if __name__ == '__main__': init_app() # Hintergrund-Thread für periodische Prüfungen (Daemon, damit Prozess sauber beendet) checker_thread = threading.Thread(target=run_checks_periodically, daemon=True) checker_thread.start() # Flask-Entwicklungsserver starten – use_reloader=False verhindert doppeltes Starten des Threads # debug=True sollte in Produktion deaktiviert werden. app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False)