+ Fixxed the SQLLite Error Start and Basic works

This commit is contained in:
Robin
2025-10-06 00:31:54 +02:00
parent 7841b808d6
commit 988ab003ad
2 changed files with 248 additions and 130 deletions

1
.gitignore vendored
View File

@@ -12,3 +12,4 @@ env/
# IDEs
.idea/
.vscode/
uptime.db

371
main.py
View File

@@ -1,39 +1,69 @@
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module='flask_admin.contrib')
import os
import requests
import threading
import time
import socket
import threading
import logging
import time
import warnings
from datetime import datetime
from typing import Optional, Tuple
from urllib.parse import urlparse
from flask import Flask, render_template, flash, redirect, url_for, request
from flask_sqlalchemy import SQLAlchemy
from flask_admin import Admin, AdminIndexView
from flask_admin.contrib.sqla import ModelView
from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required
from flask_migrate import Migrate, upgrade as upgrade_database
from werkzeug.security import generate_password_hash, check_password_hash
from flask_migrate import Migrate, upgrade
from flask_sqlalchemy import SQLAlchemy
from flask_wtf import FlaskForm
from sqlalchemy import func, inspect
from werkzeug.security import generate_password_hash, check_password_hash
from wtforms import StringField, PasswordField, SubmitField, SelectField, TextAreaField
from wtforms.validators import DataRequired, EqualTo
from wtforms.validators import DataRequired, EqualTo, URL, Optional as VOptional
# --- Initialisierung ---
# --- Konfiguration ---
# Unterdrücke spezifische Warnungen von flask_admin mit neueren SQLAlchemy-Versionen
warnings.filterwarnings("ignore", category=UserWarning, module='flask_admin.contrib')
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
"""Bündelt die Konfiguration der Anwendung."""
# Für Produktion per ENV setzen!
SECRET_KEY = os.environ.get('SECRET_KEY', 'ein-sehr-unsicherer-dev-key')
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'uptime.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False
# Konfiguration für den initialen Admin-Benutzer
INITIAL_ADMIN_USER = os.environ.get('ADMIN_USER', 'admin')
INITIAL_ADMIN_PASS = os.environ.get('ADMIN_PASS', 'admin123')
# Checker-Intervall (Sekunden)
CHECK_INTERVAL = int(os.environ.get('CHECK_INTERVAL', '300'))
# HTTP Timeout (Sekunden)
HTTP_TIMEOUT = int(os.environ.get('HTTP_TIMEOUT', '10'))
# --- App-Initialisierung ---
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(basedir, 'uptime.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = 'eine-viel-sicherere-geheime-zeichenkette' # In Produktion ändern!
app.config.from_object(Config)
# Sicherstellen, dass SECRET_KEY gesetzt ist (Warnung nur in Dev)
if app.config['SECRET_KEY'] == 'ein-sehr-unsicherer-dev-key':
app.logger.warning("Unsicherer SECRET_KEY aktiv bitte in Produktion per Umgebungsvariable setzen!")
db = SQLAlchemy(app)
migrate = Migrate(app, db)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
login_manager.login_message = 'Bitte loggen Sie sich ein, um auf diese Seite zuzugreifen.'
migrate = Migrate(app, db)
# --- Datenbankmodelle ---
@@ -43,15 +73,16 @@ class User(UserMixin, db.Model):
username = db.Column(db.String(80), unique=True, nullable=False)
password_hash = db.Column(db.String(200), nullable=False)
def set_password(self, password):
def set_password(self, password: str):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
def check_password(self, password: str) -> bool:
return check_password_hash(self.password_hash, password)
def __repr__(self):
return f'<User {self.username}>'
class Monitor(db.Model):
"""Modell für einen zu überwachenden Dienst."""
id = db.Column(db.Integer, primary_key=True)
@@ -68,22 +99,25 @@ class Monitor(db.Model):
def __repr__(self):
return f'<Monitor {self.name}>'
class UptimeLog(db.Model):
"""Modell zum Protokollieren des Uptime-Status."""
id = db.Column(db.Integer, primary_key=True)
monitor_id = db.Column(db.Integer, db.ForeignKey('monitor.id'), nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
timestamp = db.Column(db.DateTime, default=datetime.utcnow, index=True)
status_code = db.Column(db.Integer, nullable=True)
is_up = db.Column(db.Boolean, nullable=False)
def __repr__(self):
return f'<UptimeLog {self.monitor.name} - {self.timestamp} - {"Up" if self.is_up else "Down"}>'
# --- Login-Management ---
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
def load_user(user_id: int) -> Optional[User]:
return db.session.get(User, int(user_id))
# --- Formulare ---
@@ -92,68 +126,95 @@ class LoginForm(FlaskForm):
password = PasswordField('Passwort', validators=[DataRequired()])
submit = SubmitField('Einloggen')
class ChangePasswordForm(FlaskForm):
current_password = PasswordField('Aktuelles Passwort', validators=[DataRequired()])
new_password = PasswordField('Neues Passwort', validators=[DataRequired()])
confirm_password = PasswordField('Neues Passwort bestätigen', validators=[DataRequired(), EqualTo('new_password', 'Die Passwörter müssen übereinstimmen.')])
confirm_password = PasswordField(
'Neues Passwort bestätigen',
validators=[DataRequired(), EqualTo('new_password', 'Die Passwörter müssen übereinstimmen.')]
)
submit = SubmitField('Passwort ändern')
# --- Gesicherter Admin-Bereich ---
class SecureModelView(ModelView):
def is_accessible(self):
class SecureView:
"""Mixin-Klasse zur Absicherung von Admin-Ansichten."""
def is_accessible(self) -> bool:
return current_user.is_authenticated
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('login', next=request.url))
class SecureAdminIndexView(AdminIndexView):
def is_accessible(self):
return current_user.is_authenticated
def inaccessible_callback(self, name, **kwargs):
return redirect(url_for('login', next=request.url))
class SecureModelView(SecureView, ModelView):
pass
class SecureAdminIndexView(SecureView, AdminIndexView):
pass
class MonitorModelView(SecureModelView):
# Dropdown für den Monitor-Typ und manuellen Status
"""Angepasste Admin-Ansicht für Monitore."""
form_overrides = {
'monitor_type': SelectField,
'status_override': SelectField,
'status_override_message': TextAreaField
'status_override_message': TextAreaField,
}
form_args = {
'monitor_type': {
'label': 'Monitor-Typ',
'choices': [
('HTTP', 'HTTP(s)'),
('KEYWORD', 'HTTP(s) mit Keyword'),
('TCP', 'TCP-Port')
]
'choices': [('HTTP', 'HTTP(s)'), ('KEYWORD', 'HTTP(s) mit Keyword'), ('TCP', 'TCP-Port')],
'coerce': str,
},
'status_override': {
'label': 'Manueller Status',
'choices': [
('', 'Automatisch'), # Leerer String für None
('MAINTENANCE', 'Wartungsarbeiten'),
('DEGRADED', 'Leistungsprobleme'),
('OPERATIONAL', 'Funktionsfähig (Manuell)')
]
}
'choices': [('', 'Automatisch'), ('MAINTENANCE', 'Wartung'), ('DEGRADED', 'Leistungsprobleme')],
'coerce': str,
'validators': [VOptional()],
},
'url': {
'label': 'URL/Host',
'validators': [DataRequired()],
},
'port': {
'label': 'Port',
'validators': [VOptional()],
},
'keyword': {
'label': 'Keyword (optional)',
'validators': [VOptional()],
},
}
column_list = ('name', 'monitor_type', 'status_override', 'url', 'port')
form_columns = ('name', 'monitor_type', 'url', 'port', 'keyword', 'status_override', 'status_override_message')
column_labels = dict(name='Name', monitor_type='Typ', url='URL/Host', port='Port', keyword='Keyword', status_override='Manueller Status', status_override_message='Status-Nachricht')
column_labels = dict(
name='Name', monitor_type='Typ', url='URL/Host', port='Port', keyword='Keyword',
status_override='Manueller Status', status_override_message='Status-Nachricht'
)
def on_model_change(self, form, model, is_created):
if is_created:
print(f"Neuer Monitor erstellt: {model.name}. Starte initiale Prüfung.")
checker_thread = threading.Thread(target=check_monitors, kwargs={'monitor_id': model.id})
checker_thread.start()
# Validierung basierend auf Typ
if model.monitor_type in ('HTTP', 'KEYWORD'):
# Schema ergänzen, falls nicht vorhanden
parsed = urlparse(model.url)
if not parsed.scheme:
model.url = f"http://{model.url}"
elif model.monitor_type == 'TCP':
if not model.port:
raise ValueError('Für TCP-Monitore muss ein Port angegeben werden.')
super().on_model_change(form, model, is_created)
if is_created:
app.logger.info(f"Neuer Monitor erstellt: {model.name}. Starte initiale Prüfung.")
t = threading.Thread(target=check_monitors, kwargs={'monitor_id': model.id}, daemon=True)
t.start()
admin = Admin(app, name='Uptime Stats Admin', template_mode='bootstrap4', index_view=SecureAdminIndexView())
admin.add_view(MonitorModelView(Monitor, db.session, name="Monitore verwalten"))
admin.add_view(MonitorModelView(Monitor, db.session, name="Monitore"))
admin.add_view(SecureModelView(User, db.session, name="Administratoren", endpoint='user_admin'))
@@ -161,38 +222,42 @@ admin.add_view(SecureModelView(User, db.session, name="Administratoren", endpoin
@app.route('/')
def index():
monitors_with_status = []
monitors = Monitor.query.order_by(Monitor.name).all()
for monitor in monitors:
last_log = UptimeLog.query.filter_by(monitor_id=monitor.id).order_by(UptimeLog.timestamp.desc()).first()
monitors_with_status.append({
'name': monitor.name,
'url': monitor.url,
'monitor_type': monitor.monitor_type,
'keyword': monitor.keyword,
'port': monitor.port,
'status_override': monitor.status_override,
'status_override_message': monitor.status_override_message,
'is_up': last_log.is_up if last_log else None,
'last_checked': last_log.timestamp if last_log else 'Nie'
})
return render_template('index.html', monitors=monitors_with_status)
"""Öffentliche Statusseite."""
latest_log_subquery = db.session.query(
UptimeLog.monitor_id,
func.max(UptimeLog.timestamp).label('max_timestamp')
).group_by(UptimeLog.monitor_id).subquery()
monitors_with_status = db.session.query(
Monitor,
UptimeLog
).outerjoin(
latest_log_subquery, Monitor.id == latest_log_subquery.c.monitor_id
).outerjoin(
UptimeLog,
(UptimeLog.monitor_id == latest_log_subquery.c.monitor_id) &
(UptimeLog.timestamp == latest_log_subquery.c.max_timestamp)
).order_by(Monitor.name).all()
return render_template('index.html', monitors_with_status=monitors_with_status)
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('index'))
return redirect(url_for('admin.index'))
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user is None or not user.check_password(form.password.data):
flash('Ungültiger Benutzername oder Passwort.', 'danger')
return redirect(url_for('login'))
login_user(user)
flash('Erfolgreich eingeloggt.', 'success')
return redirect(url_for('admin.index'))
user = db.session.scalar(db.select(User).where(User.username == form.username.data))
if user and user.check_password(form.password.data):
login_user(user)
flash('Erfolgreich eingeloggt.', 'success')
next_page = request.args.get('next')
return redirect(next_page or url_for('admin.index'))
flash('Ungültiger Benutzername oder Passwort.', 'danger')
return render_template('login.html', form=form)
@app.route('/logout')
@login_required
def logout():
@@ -200,6 +265,7 @@ def logout():
flash('Erfolgreich ausgeloggt.', 'info')
return redirect(url_for('index'))
@app.route('/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
@@ -211,92 +277,143 @@ def change_password():
current_user.set_password(form.new_password.data)
db.session.commit()
flash('Ihr Passwort wurde erfolgreich geändert.', 'success')
return redirect(url_for('index'))
return redirect(url_for('admin.index'))
return render_template('change_password.html', form=form)
# --- Uptime-Checker (Hintergrundprozess) ---
def check_monitors(monitor_id=None):
def _perform_check(monitor: Monitor) -> Tuple[bool, Optional[int]]:
"""Führt die eigentliche Prüfung für einen einzelnen Monitor durch."""
is_up, status_code = False, None
try:
if monitor.monitor_type in ['HTTP', 'KEYWORD']:
# Sicherstellen, dass eine URL mit Schema verwendet wird
url = monitor.url
parsed = urlparse(url)
if not parsed.scheme:
url = f"http://{url}"
response = requests.get(url, timeout=app.config['HTTP_TIMEOUT'])
status_code = response.status_code
if 200 <= status_code < 400:
if monitor.monitor_type == 'KEYWORD' and monitor.keyword:
# Case-insensitive Keyword-Suche
is_up = monitor.keyword.lower() in response.text.lower()
else:
is_up = True
elif monitor.monitor_type == 'TCP':
# host kann evtl. eine URL sein Hostname extrahieren
host = monitor.url
parsed = urlparse(host)
if parsed.hostname:
host = parsed.hostname
port = monitor.port
if host and port:
with socket.create_connection((host, int(port)), timeout=app.config['HTTP_TIMEOUT']):
is_up = True
except requests.RequestException as e:
app.logger.warning(f"Fehler bei HTTP-Prüfung für '{monitor.name}': {e}")
except (socket.timeout, socket.error, OSError) as e:
app.logger.warning(f"Fehler bei TCP-Prüfung für '{monitor.name}': {e}")
except Exception as e:
app.logger.exception(f"Unerwarteter Fehler bei Prüfung für '{monitor.name}': {e}")
return is_up, status_code
def check_monitors(monitor_id: Optional[int] = None):
"""Überprüft den Status. Wenn monitor_id angegeben ist, nur diesen, sonst alle."""
with app.app_context():
query = db.select(Monitor)
if monitor_id:
monitors = Monitor.query.filter_by(id=monitor_id).all()
print(f"[{datetime.now()}] Starte initiale Prüfung für Monitor ID {monitor_id}...")
query = query.where(Monitor.id == monitor_id)
app.logger.info(f"[{datetime.now()}] Starte Prüfung für Monitor ID {monitor_id}...")
else:
monitors = Monitor.query.all()
if not monitors:
return
print(f"[{datetime.now()}] Starte periodische Überprüfung für {len(monitors)} Monitor(en)...")
app.logger.info(f"[{datetime.now()}] Starte periodische Überprüfung...")
monitors = db.session.scalars(query).all()
if not monitors:
if not monitor_id:
app.logger.info(f"[{datetime.now()}] Keine Monitore zur Überprüfung gefunden.")
return
for monitor in monitors:
# Wenn ein manueller Status gesetzt ist, überspringe die automatische Prüfung.
if monitor.status_override:
print(f" - Monitor '{monitor.name}' hat manuellen Status '{monitor.status_override}'. Überspringe Prüfung.")
app.logger.info(f" - '{monitor.name}' hat manuellen Status '{monitor.status_override}'. Überspringe.")
continue
is_up, status_code = False, None
if monitor.monitor_type in ['HTTP', 'KEYWORD']:
try:
response = requests.get(monitor.url, timeout=10)
status_code = response.status_code
if 200 <= status_code < 400:
if monitor.monitor_type == 'KEYWORD' and monitor.keyword:
if monitor.keyword in response.text:
is_up = True
else: # Standard HTTP
is_up = True
except requests.RequestException:
pass
elif monitor.monitor_type == 'TCP':
try:
# For TCP, the 'url' field holds the host
host = monitor.url
port = monitor.port
if host and port:
with socket.create_connection((host, port), timeout=10) as sock:
is_up = True
# status_code is not applicable for TCP checks
except (socket.timeout, socket.error, OSError):
pass
is_up, status_code = _perform_check(monitor)
log_entry = UptimeLog(monitor_id=monitor.id, status_code=status_code, is_up=is_up)
db.session.add(log_entry)
db.session.commit()
print(f"[{datetime.now()}] Überprüfung abgeschlossen.")
app.logger.info(f"[{datetime.now()}] Überprüfung abgeschlossen.")
def run_checks_periodically():
def run_checks_periodically(interval: int = None):
"""Führt die Überprüfungen in regelmäßigen Abständen aus."""
if interval is None:
interval = app.config['CHECK_INTERVAL']
while True:
check_monitors()
# Warte 5 Minuten (300 Sekunden) bis zur nächsten Überprüfung
time.sleep(300)
try:
check_monitors()
except Exception:
app.logger.exception("Fehler in run_checks_periodically Schleife läuft weiter.")
time.sleep(interval)
# --- Initialisierung der App ---
def create_initial_user():
# --- App-Start und Initialisierung ---
def init_app():
"""Führt einmalige Initialisierungsaufgaben aus: Migration und User-Erstellung."""
# Logging konfigurieren (zeigt auch Alembic)
logging.basicConfig(level=logging.INFO)
logging.getLogger('alembic').setLevel(logging.INFO)
with app.app_context():
# db.create_all() is now handled by migrations
if User.query.first() is None:
print("Erstelle initialen Admin-Benutzer...")
initial_user = User(username='admin')
initial_user.set_password('admin123')
app.logger.info("Führe Datenbank-Migrationen aus...")
migrations_dir = os.path.join(basedir, 'migrations')
ran_migrations = False
try:
if os.path.isdir(migrations_dir):
upgrade()
ran_migrations = True
app.logger.info("Migrationen abgeschlossen.")
else:
app.logger.warning("Kein migrations/-Ordner gefunden überspringe Alembic upgrade.")
except Exception as e:
app.logger.warning(f"Alembic upgrade fehlgeschlagen: {e}")
# Sicherstellen, dass alle Tabellen existieren Fallback für frische Setups ohne Migrations
insp = inspect(db.engine)
existing_tables = set(insp.get_table_names())
required_tables = {User.__table__.name, Monitor.__table__.name, UptimeLog.__table__.name}
if not required_tables.issubset(existing_tables):
app.logger.info("Lege fehlende Tabellen via create_all() an…")
db.create_all()
# Initialen Admin-Benutzer erstellen, falls keiner existiert.
if not db.session.scalar(db.select(User).limit(1)):
app.logger.info("Keine Benutzer gefunden. Erstelle initialen Admin-Benutzer…")
admin_user = app.config['INITIAL_ADMIN_USER']
admin_pass = app.config['INITIAL_ADMIN_PASS']
initial_user = User(username=admin_user)
initial_user.set_password(admin_pass)
db.session.add(initial_user)
db.session.commit()
print("Benutzer 'admin' mit Passwort 'admin123' erstellt.")
app.logger.info(f"Benutzer '{admin_user}' mit dem konfigurierten Passwort erstellt.")
if __name__ == '__main__':
# Apply database migrations automatically
with app.app_context():
upgrade_database()
create_initial_user()
init_app()
# Hintergrund-Thread für periodische Prüfungen (Daemon, damit Prozess sauber beendet)
checker_thread = threading.Thread(target=run_checks_periodically, daemon=True)
checker_thread.start()
app.run(host='0.0.0.0', port=5000, debug=True)
# Flask-Entwicklungsserver starten use_reloader=False verhindert doppeltes Starten des Threads
# debug=True sollte in Produktion deaktiviert werden.
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False)