Files
Uptime-Stats/main.py

495 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import requests
import socket
import threading
import logging
import time, math
import warnings
from datetime import datetime, timezone, timedelta
from typing import Optional, Tuple
from urllib.parse import urlparse
from flask import Flask, render_template, flash, redirect, url_for, request, make_response
from flask_admin import Admin, AdminIndexView, BaseView, expose
from flask_admin.contrib.sqla import ModelView
from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required
from flask_admin.menu import MenuLink
from flask_migrate import Migrate, upgrade
from flask_sqlalchemy import SQLAlchemy
from flask_wtf import FlaskForm
from sqlalchemy import func, inspect
from werkzeug.security import generate_password_hash, check_password_hash
from wtforms import StringField, PasswordField, SubmitField, SelectField, TextAreaField
from wtforms.validators import DataRequired, EqualTo, URL, Optional as VOptional
# --- Konfiguration ---
# Unterdrücke spezifische Warnungen von flask_admin mit neueren SQLAlchemy-Versionen
warnings.filterwarnings("ignore", category=UserWarning, module='flask_admin.contrib')
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
"""Bündelt die Konfiguration der Anwendung."""
# Für Produktion per ENV setzen!
SECRET_KEY = os.environ.get('SECRET_KEY', 'ein-sehr-unsicherer-dev-key')
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'uptime.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False
# Konfiguration für den initialen Admin-Benutzer
INITIAL_ADMIN_USER = os.environ.get('ADMIN_USER', 'admin')
INITIAL_ADMIN_PASS = os.environ.get('ADMIN_PASS', 'admin123')
# Checker-Intervall (Sekunden)
CHECK_INTERVAL = int(os.environ.get('CHECK_INTERVAL', '300'))
# HTTP Timeout (Sekunden)
HTTP_TIMEOUT = int(os.environ.get('HTTP_TIMEOUT', '10'))
# --- App-Initialisierung ---
app = Flask(__name__)
app.config.from_object(Config)
# Sicherstellen, dass SECRET_KEY gesetzt ist (Warnung nur in Dev)
if app.config['SECRET_KEY'] == 'ein-sehr-unsicherer-dev-key':
app.logger.warning("Unsicherer SECRET_KEY aktiv bitte in Produktion per Umgebungsvariable setzen!")
db = SQLAlchemy(app)
migrate = Migrate(app, db)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
login_manager.login_message = 'Bitte loggen Sie sich ein, um auf diese Seite zuzugreifen.'
# --- Datenbankmodelle ---
class User(UserMixin, db.Model):
"""Modell für Benutzer."""
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password_hash = db.Column(db.String(200), nullable=False)
def set_password(self, password: str):
self.password_hash = generate_password_hash(password)
def check_password(self, password: str) -> bool:
return check_password_hash(self.password_hash, password)
def __repr__(self):
return f'<User {self.username}>'
class Monitor(db.Model):
"""Modell für einen zu überwachenden Dienst."""
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False, unique=True)
monitor_type = db.Column(db.String(20), nullable=False, default='HTTP')
url = db.Column(db.String(200), nullable=False)
keyword = db.Column(db.String(100), nullable=True)
port = db.Column(db.Integer, nullable=True)
position = db.Column(db.Integer, nullable=False, default=0, server_default='0')
status_override = db.Column(db.String(50), nullable=True)
status_override_message = db.Column(db.Text, nullable=True)
logs = db.relationship('UptimeLog', backref='monitor', lazy=True, cascade="all, delete-orphan")
def __repr__(self):
return f'<Monitor {self.name}>'
class UptimeLog(db.Model):
"""Modell zum Protokollieren des Uptime-Status."""
id = db.Column(db.Integer, primary_key=True)
monitor_id = db.Column(db.Integer, db.ForeignKey('monitor.id'), nullable=False)
timestamp = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc), index=True)
status_code = db.Column(db.Integer, nullable=True)
is_up = db.Column(db.Boolean, nullable=False)
def __repr__(self):
return f'<UptimeLog {self.monitor.name} - {self.timestamp} - {"Up" if self.is_up else "Down"}>'
# --- Login-Management ---
@login_manager.user_loader
def load_user(user_id: int) -> Optional[User]:
return db.session.get(User, int(user_id))
# --- Formulare ---
class LoginForm(FlaskForm):
username = StringField('Benutzername', validators=[DataRequired()])
password = PasswordField('Passwort', validators=[DataRequired()])
submit = SubmitField('Einloggen')
class ChangePasswordForm(FlaskForm):
current_password = PasswordField('Aktuelles Passwort', validators=[DataRequired()])
new_password = PasswordField('Neues Passwort', validators=[DataRequired()])
confirm_password = PasswordField(
'Neues Passwort bestätigen',
validators=[DataRequired(), EqualTo('new_password', 'Die Passwörter müssen übereinstimmen.')]
)
submit = SubmitField('Passwort ändern')
# --- Gesicherter Admin-Bereich ---
class SecureView:
"""Mixin-Klasse zur Absicherung von Admin-Ansichten."""
def is_accessible(self) -> bool:
return current_user.is_authenticated
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('login', next=request.url))
class SecureModelView(SecureView, ModelView):
pass
class SecureAdminIndexView(SecureView, AdminIndexView):
pass
class EmbedInfoView(SecureView, BaseView):
"""Eine gesicherte Admin-Ansicht, die den iFrame-Einbettungscode anzeigt."""
@expose('/')
def index(self):
# 'render' wird von BaseView bereitgestellt und injiziert den Admin-Kontext
return self.render('admin/embed_info.html')
class MonitorModelView(SecureModelView):
"""Angepasste Admin-Ansicht für Monitore."""
form_overrides = {
'monitor_type': SelectField,
'status_override': SelectField,
'status_override_message': TextAreaField,
}
form_args = {
'monitor_type': {
'label': 'Monitor-Typ',
'choices': [('HTTP', 'HTTP(s)'), ('KEYWORD', 'HTTP(s) mit Keyword'), ('TCP', 'TCP-Port')],
'coerce': str,
},
'status_override': {
'label': 'Manueller Status',
'choices': [('', 'Automatisch'), ('MAINTENANCE', 'Wartung'), ('DEGRADED', 'Leistungsprobleme')],
'coerce': str,
'validators': [VOptional()],
},
'url': {
'label': 'URL/Host',
'validators': [DataRequired()],
},
'port': {
'label': 'Port',
'validators': [VOptional()],
},
'keyword': {
'label': 'Keyword (optional)',
'validators': [VOptional()],
},
}
column_list = ('name', 'monitor_type', 'position', 'status_override', 'url', 'port')
form_columns = ('name', 'monitor_type', 'position', 'url', 'port', 'keyword', 'status_override', 'status_override_message')
column_labels = dict(
name='Name', monitor_type='Typ', url='URL/Host', port='Port', keyword='Keyword',
position='Position', status_override='Manueller Status', status_override_message='Status-Nachricht'
)
def on_model_change(self, form, model, is_created):
# Validierung basierend auf Typ
if model.monitor_type in ('HTTP', 'KEYWORD'):
# Schema ergänzen, falls nicht vorhanden
parsed = urlparse(model.url)
if not parsed.scheme:
model.url = f"http://{model.url}"
elif model.monitor_type == 'TCP':
if not model.port:
raise ValueError('Für TCP-Monitore muss ein Port angegeben werden.')
super().on_model_change(form, model, is_created)
if is_created:
app.logger.info(f"Neuer Monitor erstellt: {model.name}. Starte initiale Prüfung.")
t = threading.Thread(target=check_monitors, kwargs={'monitor_id': model.id}, daemon=True)
t.start()
admin = Admin(app, name='Uptime Stats Admin', template_mode='bootstrap4', index_view=SecureAdminIndexView())
admin.add_view(MonitorModelView(Monitor, db.session, name="Monitore"))
admin.add_view(SecureModelView(User, db.session, name="Administratoren", endpoint='user_admin'))
admin.add_view(EmbedInfoView(name='Einbettungs-Info', endpoint='embed-info'))
# --- Web-Routen ---
@app.context_processor
def inject_now():
"""Stellt das aktuelle UTC-Datum für alle Templates zur Verfügung."""
return {'now': datetime.now(timezone.utc)}
@app.route('/')
def index():
"""Öffentliche Statusseite."""
latest_log_subquery = db.session.query(
UptimeLog.monitor_id,
func.max(UptimeLog.timestamp).label('max_timestamp')
).group_by(UptimeLog.monitor_id).subquery()
monitors_with_status = db.session.query(
Monitor,
UptimeLog
).outerjoin(
latest_log_subquery, Monitor.id == latest_log_subquery.c.monitor_id
).outerjoin(
UptimeLog,
(UptimeLog.monitor_id == latest_log_subquery.c.monitor_id) &
(UptimeLog.timestamp == latest_log_subquery.c.max_timestamp)
).order_by(Monitor.position, Monitor.name).all()
return render_template('index.html', monitors_with_status=monitors_with_status)
@app.route('/status')
def status():
"""Erstellt eine für iFrames optimierte Status-Tabellenseite."""
latest_log_subquery = db.session.query(
UptimeLog.monitor_id,
func.max(UptimeLog.timestamp).label('max_timestamp')
).group_by(UptimeLog.monitor_id).subquery()
monitors_with_status = db.session.query(
Monitor,
UptimeLog
).outerjoin(
latest_log_subquery, Monitor.id == latest_log_subquery.c.monitor_id
).outerjoin(
UptimeLog,
(UptimeLog.monitor_id == latest_log_subquery.c.monitor_id) &
(UptimeLog.timestamp == latest_log_subquery.c.max_timestamp)
).order_by(Monitor.position, Monitor.name).all()
response = make_response(render_template('status.html', monitors_with_status=monitors_with_status))
response.headers.pop('X-Frame-Options', None)
return response
@app.route('/monitor/<int:monitor_id>')
def monitor_details(monitor_id: int):
"""Zeigt Detailinformationen und Statistiken für einen einzelnen Monitor."""
monitor = db.session.get(Monitor, monitor_id)
if not monitor:
return "Monitor nicht gefunden", 404
now = datetime.now(timezone.utc)
# Statistik für 24 Stunden
start_24h = now - timedelta(hours=24)
logs_24h = db.session.scalars(
db.select(UptimeLog).where(
UptimeLog.monitor_id == monitor_id,
UptimeLog.timestamp >= start_24h
)
).all()
total_checks_24h = len(logs_24h)
up_checks_24h = sum(1 for log in logs_24h if log.is_up)
uptime_24h = (up_checks_24h / total_checks_24h * 100) if total_checks_24h > 0 else 100
# Statistik für 30 Tage
start_30d = now - timedelta(days=30)
logs_30d = db.session.scalars(
db.select(UptimeLog).where(
UptimeLog.monitor_id == monitor_id,
UptimeLog.timestamp >= start_30d
)
).all()
total_checks_30d = len(logs_30d)
up_checks_30d = sum(1 for log in logs_30d if log.is_up)
uptime_30d = (up_checks_30d / total_checks_30d * 100) if total_checks_30d > 0 else 100
return render_template('monitor_details.html', monitor=monitor,
uptime_24h=uptime_24h, uptime_30d=uptime_30d,
total_checks_24h=total_checks_24h, total_checks_30d=total_checks_30d)
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('admin.index'))
form = LoginForm()
if form.validate_on_submit():
user = db.session.scalar(db.select(User).where(User.username == form.username.data))
if user and user.check_password(form.password.data):
login_user(user)
flash('Erfolgreich eingeloggt.', 'success')
next_page = request.args.get('next')
return redirect(next_page or url_for('admin.index'))
flash('Ungültiger Benutzername oder Passwort.', 'danger')
return render_template('login.html', form=form)
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('Erfolgreich ausgeloggt.', 'info')
return redirect(url_for('index'))
@app.route('/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
form = ChangePasswordForm()
if form.validate_on_submit():
if not current_user.check_password(form.current_password.data):
flash('Das aktuelle Passwort ist nicht korrekt.', 'danger')
else:
current_user.set_password(form.new_password.data)
db.session.commit()
flash('Ihr Passwort wurde erfolgreich geändert.', 'success')
return redirect(url_for('admin.index'))
return render_template('change_password.html', form=form)
# --- Uptime-Checker (Hintergrundprozess) ---
def _perform_check(monitor: Monitor) -> Tuple[bool, Optional[int]]:
"""Führt die eigentliche Prüfung für einen einzelnen Monitor durch."""
is_up, status_code = False, None
try:
if monitor.monitor_type in ['HTTP', 'KEYWORD']:
# Sicherstellen, dass eine URL mit Schema verwendet wird
url = monitor.url
parsed = urlparse(url)
if not parsed.scheme:
url = f"http://{url}"
response = requests.get(url, timeout=app.config['HTTP_TIMEOUT'])
status_code = response.status_code
if 200 <= status_code < 400:
if monitor.monitor_type == 'KEYWORD' and monitor.keyword:
# Case-insensitive Keyword-Suche
is_up = monitor.keyword.lower() in response.text.lower()
else:
is_up = True
elif monitor.monitor_type == 'TCP':
# host kann evtl. eine URL sein Hostname extrahieren
host = monitor.url
parsed = urlparse(host)
if parsed.hostname:
host = parsed.hostname
port = monitor.port
if host and port:
with socket.create_connection((host, int(port)), timeout=app.config['HTTP_TIMEOUT']):
is_up = True
except requests.RequestException as e:
app.logger.warning(f"Fehler bei HTTP-Prüfung für '{monitor.name}': {e}")
except (socket.timeout, socket.error, OSError) as e:
app.logger.warning(f"Fehler bei TCP-Prüfung für '{monitor.name}': {e}")
except Exception as e:
app.logger.exception(f"Unerwarteter Fehler bei Prüfung für '{monitor.name}': {e}")
return is_up, status_code
def check_monitors(monitor_id: Optional[int] = None):
"""Überprüft den Status. Wenn monitor_id angegeben ist, nur diesen, sonst alle."""
with app.app_context():
query = db.select(Monitor)
if monitor_id:
query = query.where(Monitor.id == monitor_id)
app.logger.info(f"[{datetime.now(timezone.utc)}] Starte Prüfung für Monitor ID {monitor_id}...")
else:
app.logger.info(f"[{datetime.now(timezone.utc)}] Starte periodische Überprüfung...")
monitors = db.session.scalars(query).all()
if not monitors:
if not monitor_id:
app.logger.info(f"[{datetime.now(timezone.utc)}] Keine Monitore zur Überprüfung gefunden.")
return
for monitor in monitors:
if monitor.status_override:
app.logger.info(f" - '{monitor.name}' hat manuellen Status '{monitor.status_override}'. Überspringe.")
continue
is_up, status_code = _perform_check(monitor)
log_entry = UptimeLog(monitor_id=monitor.id, status_code=status_code, is_up=is_up)
db.session.add(log_entry)
db.session.commit()
app.logger.info(f"[{datetime.now(timezone.utc)}] Überprüfung abgeschlossen.")
def run_checks_periodically(interval: int = None):
"""Führt die Überprüfungen in regelmäßigen Abständen aus."""
if interval is None:
interval = app.config['CHECK_INTERVAL']
while True:
try:
check_monitors()
except Exception:
app.logger.exception("Fehler in run_checks_periodically Schleife läuft weiter.")
time.sleep(interval)
# --- App-Start und Initialisierung ---
def init_app():
"""Führt einmalige Initialisierungsaufgaben aus: Migration und User-Erstellung."""
# Logging konfigurieren (zeigt auch Alembic)
logging.basicConfig(level=logging.INFO)
logging.getLogger('alembic').setLevel(logging.INFO)
with app.app_context():
app.logger.info("Führe Datenbank-Migrationen aus...")
try:
# Wendet ausstehende Datenbank-Migrationen an.
# Dies aktualisiert das Schema auf den neuesten Stand.
upgrade()
app.logger.info("Datenbank-Migrationen erfolgreich angewendet.")
except Exception as e:
app.logger.error(f"Anwenden der Datenbank-Migrationen fehlgeschlagen: {e}")
app.logger.info("Versuche, Tabellen mit db.create_all() zu erstellen, falls sie nicht existieren.")
# Sicherstellen, dass alle Tabellen existieren Fallback für frische Setups ohne Migrations
insp = inspect(db.engine)
existing_tables = set(insp.get_table_names())
required_tables = {User.__table__.name, Monitor.__table__.name, UptimeLog.__table__.name}
if not required_tables.issubset(existing_tables):
app.logger.info("Lege fehlende Tabellen via create_all() an…")
db.create_all()
# Initialen Admin-Benutzer erstellen, falls keiner existiert.
if not db.session.scalar(db.select(User).limit(1)):
app.logger.info("Keine Benutzer gefunden. Erstelle initialen Admin-Benutzer…")
admin_user = app.config['INITIAL_ADMIN_USER']
admin_pass = app.config['INITIAL_ADMIN_PASS']
initial_user = User(username=admin_user)
initial_user.set_password(admin_pass)
db.session.add(initial_user)
db.session.commit()
app.logger.info(f"Benutzer '{admin_user}' mit dem konfigurierten Passwort erstellt.")
if __name__ == '__main__':
init_app()
# Hintergrund-Thread für periodische Prüfungen (Daemon, damit Prozess sauber beendet)
checker_thread = threading.Thread(target=run_checks_periodically, daemon=True)
checker_thread.start()
# Flask-Entwicklungsserver starten use_reloader=False verhindert doppeltes Starten des Threads
# debug=True sollte in Produktion deaktiviert werden.
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False)