import warnings warnings.filterwarnings("ignore", category=UserWarning, module='flask_admin.contrib') import os import requests import threading import time import socket from datetime import datetime from flask import Flask, render_template, flash, redirect, url_for, request from flask_sqlalchemy import SQLAlchemy from flask_admin import Admin, AdminIndexView from flask_admin.contrib.sqla import ModelView from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required from flask_migrate import Migrate, upgrade as upgrade_database from werkzeug.security import generate_password_hash, check_password_hash from flask_wtf import FlaskForm from wtforms import StringField, PasswordField, SubmitField, SelectField, TextAreaField from wtforms.validators import DataRequired, EqualTo # --- Initialisierung --- basedir = os.path.abspath(os.path.dirname(__file__)) app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(basedir, 'uptime.db') app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SECRET_KEY'] = 'eine-viel-sicherere-geheime-zeichenkette' # In Produktion ändern! db = SQLAlchemy(app) login_manager = LoginManager(app) login_manager.login_view = 'login' login_manager.login_message = 'Bitte loggen Sie sich ein, um auf diese Seite zuzugreifen.' migrate = Migrate(app, db) # --- Datenbankmodelle --- class User(UserMixin, db.Model): """Modell für Benutzer.""" id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) password_hash = db.Column(db.String(200), nullable=False) def set_password(self, password): self.password_hash = generate_password_hash(password) def check_password(self, password): return check_password_hash(self.password_hash, password) def __repr__(self): return f'' class Monitor(db.Model): """Modell für einen zu überwachenden Dienst.""" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), nullable=False, unique=True) monitor_type = db.Column(db.String(20), nullable=False, default='HTTP') url = db.Column(db.String(200), nullable=False) keyword = db.Column(db.String(100), nullable=True) port = db.Column(db.Integer, nullable=True) status_override = db.Column(db.String(50), nullable=True) status_override_message = db.Column(db.Text, nullable=True) logs = db.relationship('UptimeLog', backref='monitor', lazy=True, cascade="all, delete-orphan") def __repr__(self): return f'' class UptimeLog(db.Model): """Modell zum Protokollieren des Uptime-Status.""" id = db.Column(db.Integer, primary_key=True) monitor_id = db.Column(db.Integer, db.ForeignKey('monitor.id'), nullable=False) timestamp = db.Column(db.DateTime, default=datetime.utcnow) status_code = db.Column(db.Integer, nullable=True) is_up = db.Column(db.Boolean, nullable=False) def __repr__(self): return f'' # --- Login-Management --- @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) # --- Formulare --- class LoginForm(FlaskForm): username = StringField('Benutzername', validators=[DataRequired()]) password = PasswordField('Passwort', validators=[DataRequired()]) submit = SubmitField('Einloggen') class ChangePasswordForm(FlaskForm): current_password = PasswordField('Aktuelles Passwort', validators=[DataRequired()]) new_password = PasswordField('Neues Passwort', validators=[DataRequired()]) confirm_password = PasswordField('Neues Passwort bestätigen', validators=[DataRequired(), EqualTo('new_password', 'Die Passwörter müssen übereinstimmen.')]) submit = SubmitField('Passwort ändern') # --- Gesicherter Admin-Bereich --- class SecureModelView(ModelView): def is_accessible(self): return current_user.is_authenticated def inaccessible_callback(self, name, **kwargs): return redirect(url_for('login', next=request.url)) class SecureAdminIndexView(AdminIndexView): def is_accessible(self): return current_user.is_authenticated def inaccessible_callback(self, name, **kwargs): return redirect(url_for('login', next=request.url)) class MonitorModelView(SecureModelView): # Dropdown für den Monitor-Typ und manuellen Status form_overrides = { 'monitor_type': SelectField, 'status_override': SelectField, 'status_override_message': TextAreaField } form_args = { 'monitor_type': { 'label': 'Monitor-Typ', 'choices': [ ('HTTP', 'HTTP(s)'), ('KEYWORD', 'HTTP(s) mit Keyword'), ('TCP', 'TCP-Port') ] }, 'status_override': { 'label': 'Manueller Status', 'choices': [ ('', 'Automatisch'), # Leerer String für None ('MAINTENANCE', 'Wartungsarbeiten'), ('DEGRADED', 'Leistungsprobleme'), ('OPERATIONAL', 'Funktionsfähig (Manuell)') ] } } column_list = ('name', 'monitor_type', 'status_override', 'url', 'port') form_columns = ('name', 'monitor_type', 'url', 'port', 'keyword', 'status_override', 'status_override_message') column_labels = dict(name='Name', monitor_type='Typ', url='URL/Host', port='Port', keyword='Keyword', status_override='Manueller Status', status_override_message='Status-Nachricht') def on_model_change(self, form, model, is_created): if is_created: print(f"Neuer Monitor erstellt: {model.name}. Starte initiale Prüfung.") checker_thread = threading.Thread(target=check_monitors, kwargs={'monitor_id': model.id}) checker_thread.start() super().on_model_change(form, model, is_created) admin = Admin(app, name='Uptime Stats Admin', template_mode='bootstrap4', index_view=SecureAdminIndexView()) admin.add_view(MonitorModelView(Monitor, db.session, name="Monitore verwalten")) admin.add_view(SecureModelView(User, db.session, name="Administratoren", endpoint='user_admin')) # --- Web-Routen --- @app.route('/') def index(): monitors_with_status = [] monitors = Monitor.query.order_by(Monitor.name).all() for monitor in monitors: last_log = UptimeLog.query.filter_by(monitor_id=monitor.id).order_by(UptimeLog.timestamp.desc()).first() monitors_with_status.append({ 'name': monitor.name, 'url': monitor.url, 'monitor_type': monitor.monitor_type, 'keyword': monitor.keyword, 'port': monitor.port, 'status_override': monitor.status_override, 'status_override_message': monitor.status_override_message, 'is_up': last_log.is_up if last_log else None, 'last_checked': last_log.timestamp if last_log else 'Nie' }) return render_template('index.html', monitors=monitors_with_status) @app.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: return redirect(url_for('index')) form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(username=form.username.data).first() if user is None or not user.check_password(form.password.data): flash('Ungültiger Benutzername oder Passwort.', 'danger') return redirect(url_for('login')) login_user(user) flash('Erfolgreich eingeloggt.', 'success') return redirect(url_for('admin.index')) return render_template('login.html', form=form) @app.route('/logout') @login_required def logout(): logout_user() flash('Erfolgreich ausgeloggt.', 'info') return redirect(url_for('index')) @app.route('/change-password', methods=['GET', 'POST']) @login_required def change_password(): form = ChangePasswordForm() if form.validate_on_submit(): if not current_user.check_password(form.current_password.data): flash('Das aktuelle Passwort ist nicht korrekt.', 'danger') else: current_user.set_password(form.new_password.data) db.session.commit() flash('Ihr Passwort wurde erfolgreich geändert.', 'success') return redirect(url_for('index')) return render_template('change_password.html', form=form) # --- Uptime-Checker (Hintergrundprozess) --- def check_monitors(monitor_id=None): """Überprüft den Status. Wenn monitor_id angegeben ist, nur diesen, sonst alle.""" with app.app_context(): if monitor_id: monitors = Monitor.query.filter_by(id=monitor_id).all() print(f"[{datetime.now()}] Starte initiale Prüfung für Monitor ID {monitor_id}...") else: monitors = Monitor.query.all() if not monitors: return print(f"[{datetime.now()}] Starte periodische Überprüfung für {len(monitors)} Monitor(en)...") if not monitors: return for monitor in monitors: # Wenn ein manueller Status gesetzt ist, überspringe die automatische Prüfung. if monitor.status_override: print(f" - Monitor '{monitor.name}' hat manuellen Status '{monitor.status_override}'. Überspringe Prüfung.") continue is_up, status_code = False, None if monitor.monitor_type in ['HTTP', 'KEYWORD']: try: response = requests.get(monitor.url, timeout=10) status_code = response.status_code if 200 <= status_code < 400: if monitor.monitor_type == 'KEYWORD' and monitor.keyword: if monitor.keyword in response.text: is_up = True else: # Standard HTTP is_up = True except requests.RequestException: pass elif monitor.monitor_type == 'TCP': try: # For TCP, the 'url' field holds the host host = monitor.url port = monitor.port if host and port: with socket.create_connection((host, port), timeout=10) as sock: is_up = True # status_code is not applicable for TCP checks except (socket.timeout, socket.error, OSError): pass log_entry = UptimeLog(monitor_id=monitor.id, status_code=status_code, is_up=is_up) db.session.add(log_entry) db.session.commit() print(f"[{datetime.now()}] Überprüfung abgeschlossen.") def run_checks_periodically(): """Führt die Überprüfungen in regelmäßigen Abständen aus.""" while True: check_monitors() # Warte 5 Minuten (300 Sekunden) bis zur nächsten Überprüfung time.sleep(300) # --- Initialisierung der App --- def create_initial_user(): with app.app_context(): # db.create_all() is now handled by migrations if User.query.first() is None: print("Erstelle initialen Admin-Benutzer...") initial_user = User(username='admin') initial_user.set_password('admin123') db.session.add(initial_user) db.session.commit() print("Benutzer 'admin' mit Passwort 'admin123' erstellt.") if __name__ == '__main__': # Apply database migrations automatically with app.app_context(): upgrade_database() create_initial_user() checker_thread = threading.Thread(target=run_checks_periodically, daemon=True) checker_thread.start() app.run(host='0.0.0.0', port=5000, debug=True)