diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..2b21f65 --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max-line-length = 140 \ No newline at end of file diff --git a/.gitea/workflows/ci-cd.yml b/.gitea/workflows/ci-cd.yml index d023cde..e555e4f 100644 --- a/.gitea/workflows/ci-cd.yml +++ b/.gitea/workflows/ci-cd.yml @@ -113,7 +113,7 @@ jobs: - name: Run flake8 run: | flake8 pterodisbot.py server_metrics_graphs.py \ - --max-line-length=120 \ + --max-line-length=140 \ --ignore=E501,W503,E203 \ --exclude=venv,__pycache__,build,dist \ --statistics \ @@ -124,19 +124,19 @@ jobs: run: | pylint pterodisbot.py server_metrics_graphs.py \ --disable=C0111,C0103,R0913,R0914,R0915,W0718 \ - --max-line-length=120 \ + --max-line-length=140 \ --output-format=text \ --reports=y > pylint-report.txt || true continue-on-error: true - name: Check code formatting with black run: | - black --check --line-length=120 --diff pterodisbot.py server_metrics_graphs.py | tee black-report.txt + black --check --line-length=140 --diff pterodisbot.py server_metrics_graphs.py | tee black-report.txt continue-on-error: true - name: Check import ordering run: | - isort --check-only --profile black --line-length=120 pterodisbot.py server_metrics_graphs.py + isort --check-only --profile black --line-length=140 pterodisbot.py server_metrics_graphs.py continue-on-error: true - name: Type checking with mypy diff --git a/pterodisbot.py b/pterodisbot.py index aeb8c7a..16d2ac3 100644 --- a/pterodisbot.py +++ b/pterodisbot.py @@ -15,7 +15,7 @@ Features: import discord from discord.ext import commands, tasks -from discord import app_commands +from server_metrics_graphs import ServerMetricsManager import os import sys import signal @@ -23,7 +23,6 @@ import types import aiohttp import asyncio import json -import traceback import logging from logging.handlers import RotatingFileHandler import configparser @@ -31,37 +30,35 @@ from datetime import datetime from typing import Dict, List, Optional, Tuple from pathlib import Path import matplotlib -matplotlib.use('Agg') # Use non-interactive backend for server environments -import matplotlib.pyplot as plt -import matplotlib.dates as mdates -from collections import deque -import io -from server_metrics_graphs import ServerMetricsGraphs, ServerMetricsManager + +matplotlib.use("Agg") # Use non-interactive backend for server environments # ============================================== # LOGGING SETUP # ============================================== -logs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs') +logs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") os.makedirs(logs_dir, exist_ok=True) -logger = logging.getLogger('pterodisbot') +logger = logging.getLogger("pterodisbot") logger.setLevel(logging.DEBUG) # File handler for logs (rotates when reaching 5MB, keeps 3 backups) handler = RotatingFileHandler( - filename=os.path.join(logs_dir, 'pterodisbot.log'), - maxBytes=5*1024*1024, # 5 MiB max log file size - backupCount=3, # Rotate through 3 files - encoding='utf-8' + filename=os.path.join(logs_dir, "pterodisbot.log"), + maxBytes=5 * 1024 * 1024, # 5 MiB max log file size + backupCount=3, # Rotate through 3 files + encoding="utf-8", ) -handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) +handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) logger.addHandler(handler) # Console handler for real-time output console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) -console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) +console_handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") +) logger.addHandler(console_handler) logger.info("Initialized logging system with file and console output") @@ -74,86 +71,92 @@ logger.info("Initialized logging system with file and console output") # logger.debug("Gennerated config.ini file using values from .env") config = configparser.ConfigParser() -config.read('config.ini') +config.read("config.ini") # ============================================== # CONFIGURATION VALIDATION # ============================================== + class ConfigValidationError(Exception): """Custom exception for configuration validation errors.""" + pass + def validate_config(): """ Validate all required configuration values at startup. Raises ConfigValidationError if any required values are missing or invalid. """ errors = [] - + # Validate Pterodactyl section - if not config.has_section('Pterodactyl'): + if not config.has_section("Pterodactyl"): errors.append("Missing [Pterodactyl] section in config.ini") else: - required_ptero = ['PanelURL', 'ClientAPIKey', 'ApplicationAPIKey'] + required_ptero = ["PanelURL", "ClientAPIKey", "ApplicationAPIKey"] for key in required_ptero: - if not config.get('Pterodactyl', key, fallback=None): + if not config.get("Pterodactyl", key, fallback=None): errors.append(f"Missing required Pterodactyl config value: {key}") - + # Validate Discord section - if not config.has_section('Discord'): + if not config.has_section("Discord"): errors.append("Missing [Discord] section in config.ini") else: - required_discord = ['Token', 'AllowedGuildID'] + required_discord = ["Token", "AllowedGuildID"] for key in required_discord: - if not config.get('Discord', key, fallback=None): + if not config.get("Discord", key, fallback=None): errors.append(f"Missing required Discord config value: {key}") - + # Validate AllowedGuildID is a valid integer try: - guild_id = config.getint('Discord', 'AllowedGuildID', fallback=0) + guild_id = config.getint("Discord", "AllowedGuildID", fallback=0) if guild_id <= 0: errors.append("AllowedGuildID must be a positive integer") except ValueError: errors.append("AllowedGuildID must be a valid integer") - + # Validate API keys have correct prefixes - client_key = config.get('Pterodactyl', 'ClientAPIKey', fallback='') - if client_key and not client_key.startswith('ptlc_'): + client_key = config.get("Pterodactyl", "ClientAPIKey", fallback="") + if client_key and not client_key.startswith("ptlc_"): errors.append("ClientAPIKey should start with 'ptlc_'") - - app_key = config.get('Pterodactyl', 'ApplicationAPIKey', fallback='') - if app_key and not app_key.startswith('ptla_'): + + app_key = config.get("Pterodactyl", "ApplicationAPIKey", fallback="") + if app_key and not app_key.startswith("ptla_"): errors.append("ApplicationAPIKey should start with 'ptla_'") - + # Validate PanelURL is a valid URL - panel_url = config.get('Pterodactyl', 'PanelURL', fallback='') - if panel_url and not (panel_url.startswith('http://') or panel_url.startswith('https://')): + panel_url = config.get("Pterodactyl", "PanelURL", fallback="") + if panel_url and not ( + panel_url.startswith("http://") or panel_url.startswith("https://") + ): errors.append("PanelURL must start with http:// or https://") - + if errors: error_msg = "Configuration validation failed:\n- " + "\n- ".join(errors) logger.error(error_msg) raise ConfigValidationError(error_msg) - + logger.info("Configuration validation passed") + # ============================================== # CONSTANTS (Updated with validation) # ============================================== try: validate_config() - - PTERODACTYL_URL = config.get('Pterodactyl', 'PanelURL') - PTERODACTYL_CLIENT_API_KEY = config.get('Pterodactyl', 'ClientAPIKey') - PTERODACTYL_APPLICATION_API_KEY = config.get('Pterodactyl', 'ApplicationAPIKey') - DISCORD_TOKEN = config.get('Discord', 'Token') - ALLOWED_GUILD_ID = config.getint('Discord', 'AllowedGuildID') + + PTERODACTYL_URL = config.get("Pterodactyl", "PanelURL") + PTERODACTYL_CLIENT_API_KEY = config.get("Pterodactyl", "ClientAPIKey") + PTERODACTYL_APPLICATION_API_KEY = config.get("Pterodactyl", "ApplicationAPIKey") + DISCORD_TOKEN = config.get("Discord", "Token") + ALLOWED_GUILD_ID = config.getint("Discord", "AllowedGuildID") REQUIRED_ROLE = "Game Server User" UPDATE_INTERVAL = 10 EMBED_LOCATIONS_FILE = "./embed/embed_locations.json" - + logger.debug("Loaded and validated configuration values from config.ini") except ConfigValidationError as e: @@ -167,204 +170,249 @@ except Exception as e: # PTERODACTYL API CLASS # ============================================== + class PterodactylAPI: """ Handles all interactions with the Pterodactyl Panel API. Uses client API key for client endpoints and application API key for admin endpoints. Provides methods for server management and monitoring. """ - + def __init__(self, panel_url: str, client_api_key: str, application_api_key: str): """ Initialize the Pterodactyl API client with both API keys. - + Args: panel_url: URL of the Pterodactyl panel (must include protocol) client_api_key: API key for client endpoints (starts with ptlc_) application_api_key: API key for application endpoints (starts with ptla_) """ - self.panel_url = panel_url.rstrip('/') + self.panel_url = panel_url.rstrip("/") self.client_api_key = client_api_key self.application_api_key = application_api_key self.session = None self.lock = asyncio.Lock() # Prevents concurrent API access logger.info("Initialized PterodactylAPI client with provided credentials") - + async def initialize(self): """Initialize the aiohttp client session for API requests.""" self.session = aiohttp.ClientSession() logger.debug("Created new aiohttp ClientSession") - + async def close(self): """Cleanly close the aiohttp session when shutting down.""" if self.session and not self.session.closed: await self.session.close() logger.debug("Closed aiohttp ClientSession") - - async def _request(self, method: str, endpoint: str, data: Optional[dict] = None, use_application_key: bool = False) -> dict: + + async def _request( + self, + method: str, + endpoint: str, + data: Optional[dict] = None, + use_application_key: bool = False, + ) -> dict: """ Make an authenticated request to the Pterodactyl API. - + Args: method: HTTP method (GET, POST, PUT, DELETE, etc.) endpoint: API endpoint (e.g., 'application/servers') data: Optional JSON payload for POST/PUT requests use_application_key: Whether to use the application API key (admin endpoints) - + Returns: Dictionary containing API response or error information - + Raises: aiohttp.ClientError: For network-related issues json.JSONDecodeError: If response cannot be parsed as JSON """ url = f"{self.panel_url}/api/{endpoint}" api_key_type = "Application" if use_application_key else "Client" - logger.debug(f"Preparing {method} request to {endpoint} using {api_key_type} API key") - + logger.debug( + f"Preparing {method} request to {endpoint} using {api_key_type} API key" + ) + # Choose the appropriate API key - api_key = self.application_api_key if use_application_key else self.client_api_key + api_key = ( + self.application_api_key if use_application_key else self.client_api_key + ) headers = { "Authorization": f"Bearer {api_key}", "Accept": "application/json", - "Content-Type": "application/json" + "Content-Type": "application/json", } - + try: async with self.lock: logger.debug(f"Acquired lock for API request to {endpoint}") async with self.session.request( - method, - url, - headers=headers, - json=data + method, url, headers=headers, json=data ) as response: if response.status == 204: # No content - logger.debug(f"Received 204 No Content response from {endpoint}") + logger.debug( + f"Received 204 No Content response from {endpoint}" + ) return {"status": "success"} - + response_data = await response.json() - logger.debug(f"Received response from {endpoint} with status {response.status}") - + logger.debug( + f"Received response from {endpoint} with status {response.status}" + ) + if response.status >= 400: - error_msg = response_data.get('errors', [{}])[0].get('detail', 'Unknown error') - logger.error(f"API request to {endpoint} failed with status {response.status}: {error_msg}") + error_msg = response_data.get("errors", [{}])[0].get( + "detail", "Unknown error" + ) + logger.error( + f"API request to {endpoint} failed with status {response.status}: {error_msg}" + ) return {"status": "error", "message": error_msg} - + return response_data except Exception as e: logger.error(f"Exception during API request to {endpoint}: {str(e)}") return {"status": "error", "message": str(e)} - + async def get_servers(self) -> List[dict]: """ Get a list of all servers from the Pterodactyl panel. Uses application API key as this is an admin endpoint. - + Returns: List of server dictionaries containing all server attributes """ logger.info("Fetching list of all servers from Pterodactyl panel") - response = await self._request("GET", "application/servers", use_application_key=True) - servers = response.get('data', []) + response = await self._request( + "GET", "application/servers", use_application_key=True + ) + servers = response.get("data", []) logger.info(f"Retrieved {len(servers)} servers from Pterodactyl panel") return servers - + async def get_server_resources(self, server_id: str) -> dict: """ Get resource usage for a specific server. Uses client API key as this is a client endpoint. - + Args: server_id: The Pterodactyl server identifier - + Returns: Dictionary containing server resource usage and current state """ logger.debug(f"Fetching resource usage for server {server_id}") try: - response = await self._request("GET", f"client/servers/{server_id}/resources") - if response.get('status') == 'error': - error_msg = response.get('message', 'Unknown error') - logger.error(f"Failed to get resources for server {server_id}: {error_msg}") - return {'attributes': {'current_state': 'offline'}} - - state = response.get('attributes', {}).get('current_state', 'unknown') + response = await self._request( + "GET", f"client/servers/{server_id}/resources" + ) + if response.get("status") == "error": + error_msg = response.get("message", "Unknown error") + logger.error( + f"Failed to get resources for server {server_id}: {error_msg}" + ) + return {"attributes": {"current_state": "offline"}} + + state = response.get("attributes", {}).get("current_state", "unknown") logger.debug(f"Server {server_id} current state: {state}") return response except Exception as e: - logger.error(f"Exception getting resources for server {server_id}: {str(e)}") - return {'attributes': {'current_state': 'offline'}} - + logger.error( + f"Exception getting resources for server {server_id}: {str(e)}" + ) + return {"attributes": {"current_state": "offline"}} + async def send_power_action(self, server_id: str, action: str) -> dict: """ Send a power action to a server (start/stop/restart). Uses client API key as this is a client endpoint. - + Args: server_id: The Pterodactyl server identifier action: Power action to send (start/stop/restart) - + Returns: Dictionary containing API response status """ - valid_actions = ['start', 'stop', 'restart'] + valid_actions = ["start", "stop", "restart"] if action not in valid_actions: logger.warning(f"Invalid power action attempted: {action}") - return {"status": "error", "message": f"Invalid action. Must be one of: {', '.join(valid_actions)}"} - + return { + "status": "error", + "message": f"Invalid action. Must be one of: {', '.join(valid_actions)}", + } + logger.info(f"Sending {action} command to server {server_id}") - result = await self._request("POST", f"client/servers/{server_id}/power", {"signal": action}) - - if result.get('status') == 'success': + result = await self._request( + "POST", f"client/servers/{server_id}/power", {"signal": action} + ) + + if result.get("status") == "success": logger.info(f"Successfully executed {action} on server {server_id}") else: - logger.error(f"Failed to execute {action} on server {server_id}: {result.get('message', 'Unknown error')}") - + logger.error( + f"Failed to execute {action} on server {server_id}: {result.get('message', 'Unknown error')}" + ) + return result - + async def get_server_details(self, server_id: str) -> dict: """ Get detailed server information including allocations. Uses application API key as this is an admin endpoint. - + Args: server_id: The Pterodactyl server identifier - + Returns: Dictionary containing detailed server information """ logger.debug(f"Fetching detailed information for server {server_id}") - return await self._request("GET", f"application/servers/{server_id}", use_application_key=True) - + return await self._request( + "GET", f"application/servers/{server_id}", use_application_key=True + ) + async def get_server_allocations(self, server_id: str) -> dict: """ Get allocation information for a server (IP addresses and ports). Uses application API key as this is an admin endpoint. - + Args: server_id: The Pterodactyl server identifier - + Returns: Dictionary containing server allocation information """ logger.debug(f"Fetching allocation information for server {server_id}") - return await self._request("GET", f"application/servers/{server_id}/allocations", use_application_key=True) + return await self._request( + "GET", + f"application/servers/{server_id}/allocations", + use_application_key=True, + ) + # ============================================== # SERVER STATUS VIEW CLASS (Buttons and UI) # ============================================== + class ServerStatusView(discord.ui.View): """ Interactive Discord view containing server control buttons. Provides persistent controls for server management with role-based access. """ - - def __init__(self, server_id: str, server_name: str, pterodactyl_api: PterodactylAPI, server_data: dict): + + def __init__( + self, + server_id: str, + server_name: str, + pterodactyl_api: PterodactylAPI, + server_data: dict, + ): """ Initialize the server status view with control buttons. - + Args: server_id: The server's Pterodactyl identifier server_name: Human-readable server name @@ -377,174 +425,219 @@ class ServerStatusView(discord.ui.View): self.api = pterodactyl_api self.server_data = server_data logger.debug(f"Created ServerStatusView for {server_name} ({server_id})") - + async def interaction_check(self, interaction: discord.Interaction) -> bool: """ Verify the interacting user has the required role and is in the allowed guild. - + Args: interaction: Discord interaction object - + Returns: bool: True if authorized, False otherwise """ # First check if interaction is from the allowed guild if interaction.guild_id != ALLOWED_GUILD_ID: - logger.warning(f"Unauthorized interaction attempt from guild {interaction.guild_id}") + logger.warning( + f"Unauthorized interaction attempt from guild {interaction.guild_id}" + ) await interaction.response.send_message( - "This bot is only available in a specific server.", - ephemeral=True + "This bot is only available in a specific server.", ephemeral=True ) return False - + # Then check for required role - logger.debug(f"Checking permissions for {interaction.user.name} on server {self.server_name}") + logger.debug( + f"Checking permissions for {interaction.user.name} on server {self.server_name}" + ) has_role = any(role.name == REQUIRED_ROLE for role in interaction.user.roles) if not has_role: - logger.warning(f"Permission denied for {interaction.user.name} - missing '{REQUIRED_ROLE}' role") + logger.warning( + f"Permission denied for {interaction.user.name} - missing '{REQUIRED_ROLE}' role" + ) await interaction.response.send_message( f"You don't have permission to control servers. You need the '{REQUIRED_ROLE}' role.", - ephemeral=True + ephemeral=True, ) return False - + logger.debug(f"Permission granted for {interaction.user.name}") return True - - async def on_error(self, interaction: discord.Interaction, error: Exception, item: discord.ui.Item): + + async def on_error( + self, interaction: discord.Interaction, error: Exception, item: discord.ui.Item + ): """ Handle errors in button interactions. - + Args: interaction: Discord interaction object error: Exception that occurred item: The UI item that triggered the error """ - logger.error(f"View error in {self.server_name} by {interaction.user.name}: {str(error)}") - await interaction.response.send_message( - "An error occurred while processing your request.", - ephemeral=True + logger.error( + f"View error in {self.server_name} by {interaction.user.name}: {str(error)}" ) - - @discord.ui.button(label="Start", style=discord.ButtonStyle.green, custom_id="start_button") - async def start_button(self, interaction: discord.Interaction, button: discord.ui.Button): + await interaction.response.send_message( + "An error occurred while processing your request.", ephemeral=True + ) + + @discord.ui.button( + label="Start", style=discord.ButtonStyle.green, custom_id="start_button" + ) + async def start_button( + self, interaction: discord.Interaction, button: discord.ui.Button + ): """Send a start command to the server.""" - logger.info(f"Start button pressed for {self.server_name} by {interaction.user.name}") + logger.info( + f"Start button pressed for {self.server_name} by {interaction.user.name}" + ) await interaction.response.defer(ephemeral=True) result = await self.api.send_power_action(self.server_id, "start") - - if result.get('status') == 'success': + + if result.get("status") == "success": message = f"Server '{self.server_name}' is starting..." logger.info(f"Successfully started server {self.server_name}") else: - message = f"Failed to start server: {result.get('message', 'Unknown error')}" + message = ( + f"Failed to start server: {result.get('message', 'Unknown error')}" + ) logger.error(f"Failed to start server {self.server_name}: {message}") - + await interaction.followup.send(message, ephemeral=True) - - @discord.ui.button(label="Stop", style=discord.ButtonStyle.red, custom_id="stop_button") - async def stop_button(self, interaction: discord.Interaction, button: discord.ui.Button): + + @discord.ui.button( + label="Stop", style=discord.ButtonStyle.red, custom_id="stop_button" + ) + async def stop_button( + self, interaction: discord.Interaction, button: discord.ui.Button + ): """Send a stop command to the server.""" - logger.info(f"Stop button pressed for {self.server_name} by {interaction.user.name}") + logger.info( + f"Stop button pressed for {self.server_name} by {interaction.user.name}" + ) await interaction.response.defer(ephemeral=True) result = await self.api.send_power_action(self.server_id, "stop") - - if result.get('status') == 'success': + + if result.get("status") == "success": message = f"Server '{self.server_name}' is stopping..." logger.info(f"Successfully stopped server {self.server_name}") else: message = f"Failed to stop server: {result.get('message', 'Unknown error')}" logger.error(f"Failed to stop server {self.server_name}: {message}") - + await interaction.followup.send(message, ephemeral=True) - - @discord.ui.button(label="Restart", style=discord.ButtonStyle.blurple, custom_id="restart_button") - async def restart_button(self, interaction: discord.Interaction, button: discord.ui.Button): + + @discord.ui.button( + label="Restart", style=discord.ButtonStyle.blurple, custom_id="restart_button" + ) + async def restart_button( + self, interaction: discord.Interaction, button: discord.ui.Button + ): """Send a restart command to the server.""" - logger.info(f"Restart button pressed for {self.server_name} by {interaction.user.name}") + logger.info( + f"Restart button pressed for {self.server_name} by {interaction.user.name}" + ) await interaction.response.defer(ephemeral=True) result = await self.api.send_power_action(self.server_id, "restart") - - if result.get('status') == 'success': + + if result.get("status") == "success": message = f"Server '{self.server_name}' is restarting..." logger.info(f"Successfully restarted server {self.server_name}") else: - message = f"Failed to restart server: {result.get('message', 'Unknown error')}" + message = ( + f"Failed to restart server: {result.get('message', 'Unknown error')}" + ) logger.error(f"Failed to restart server {self.server_name}: {message}") - + await interaction.followup.send(message, ephemeral=True) - - @discord.ui.button(label="Show Address", style=discord.ButtonStyle.grey, custom_id="show_address_button") - async def show_address_button(self, interaction: discord.Interaction, button: discord.ui.Button): + + @discord.ui.button( + label="Show Address", + style=discord.ButtonStyle.grey, + custom_id="show_address_button", + ) + async def show_address_button( + self, interaction: discord.Interaction, button: discord.ui.Button + ): """Show server's default allocation IP and port using client API.""" - logger.info(f"Show Address button pressed for {self.server_name} by {interaction.user.name}") + logger.info( + f"Show Address button pressed for {self.server_name} by {interaction.user.name}" + ) try: await interaction.response.defer(ephemeral=True) logger.debug(f"Fetching server details for {self.server_id}") - + # Get server details using client API server_details = await self.api._request( - "GET", - f"client/servers/{self.server_id}", - use_application_key=False + "GET", f"client/servers/{self.server_id}", use_application_key=False ) - - if server_details.get('status') == 'error': - error_msg = server_details.get('message', 'Unknown error') - logger.error(f"Failed to get server details for {self.server_id}: {error_msg}") + + if server_details.get("status") == "error": + error_msg = server_details.get("message", "Unknown error") + logger.error( + f"Failed to get server details for {self.server_id}: {error_msg}" + ) raise ValueError(error_msg) - - attributes = server_details.get('attributes', {}) - relationships = attributes.get('relationships', {}) - allocations = relationships.get('allocations', {}).get('data', []) - + + attributes = server_details.get("attributes", {}) + relationships = attributes.get("relationships", {}) + allocations = relationships.get("allocations", {}).get("data", []) + if not allocations: logger.warning(f"No allocations found for server {self.server_id}") raise ValueError("No allocations found for this server") - + # Find the default allocation (is_default=True) default_allocation = next( - (alloc for alloc in allocations - if alloc.get('attributes', {}).get('is_default', False)), - allocations[0] # Fallback to first allocation if no default found + ( + alloc + for alloc in allocations + if alloc.get("attributes", {}).get("is_default", False) + ), + allocations[0], # Fallback to first allocation if no default found ) - - allocation_attrs = default_allocation.get('attributes', {}) - ip_alias = allocation_attrs.get('ip_alias', 'Unknown') - port = str(allocation_attrs.get('port', 'Unknown')) - - logger.debug(f"Retrieved connection info for {self.server_id}: {ip_alias}:{port}") - + + allocation_attrs = default_allocation.get("attributes", {}) + ip_alias = allocation_attrs.get("ip_alias", "Unknown") + port = str(allocation_attrs.get("port", "Unknown")) + + logger.debug( + f"Retrieved connection info for {self.server_id}: {ip_alias}:{port}" + ) + # Create and send embed embed = discord.Embed( title=f"{self.server_name} Connection Info", color=discord.Color.blue(), - description=f"Server ID: `{self.server_id}`" + description=f"Server ID: `{self.server_id}`", ) embed.add_field(name="Address", value=f"`{ip_alias}`", inline=True) embed.add_field(name="Port", value=f"`{port}`", inline=True) - + await interaction.followup.send(embed=embed, ephemeral=True) logger.info(f"Displayed connection info for {self.server_name}") - + except Exception as e: logger.error(f"Failed to show address for {self.server_name}: {str(e)}") await interaction.followup.send( "âš ī¸ Failed to get connection info. The server may not have any ports allocated.", - ephemeral=True + ephemeral=True, ) + # ============================================== # MAIN BOT CLASS # ============================================== + class PterodactylBot(commands.Bot): """ Main bot class for Pterodactyl server management. Handles Discord interactions, embed management, and background tasks. Manages server status embeds and user commands. """ - + def __init__(self, *args, **kwargs): """ Initialize the Pterodactyl bot instance. @@ -553,67 +646,71 @@ class PterodactylBot(commands.Bot): super().__init__(*args, **kwargs) self.pterodactyl_api = None # Pterodactyl API client self.server_cache: Dict[str, dict] = {} # Cache of server data from Pterodactyl - self.embed_locations: Dict[str, Dict[str, int]] = {} # Tracks where embeds are posted + self.embed_locations: Dict[str, Dict[str, int]] = ( + {} + ) # Tracks where embeds are posted self.update_lock = asyncio.Lock() # Prevents concurrent updates - self.embed_storage_path = Path(EMBED_LOCATIONS_FILE) # File to store embed - self.metrics_manager = ServerMetricsManager() # Data manager for metrics graphing system + self.embed_storage_path = Path(EMBED_LOCATIONS_FILE) # File to store embed + self.metrics_manager = ( + ServerMetricsManager() + ) # Data manager for metrics graphing system # Track previous server states and CPU usage to detect changes # Format: {server_id: (state, cpu_usage, last_force_update)} self.previous_states: Dict[str, Tuple[str, float, Optional[float]]] = {} logger.info("Initialized PterodactylBot instance with state tracking") - + async def setup_hook(self): """ Bot setup routine called when the bot is starting. Initializes API client, loads saved data, and starts background tasks. """ logger.info("Running bot setup hook") - + # Initialize API client self.pterodactyl_api = PterodactylAPI( - PTERODACTYL_URL, - PTERODACTYL_CLIENT_API_KEY, - PTERODACTYL_APPLICATION_API_KEY + PTERODACTYL_URL, PTERODACTYL_CLIENT_API_KEY, PTERODACTYL_APPLICATION_API_KEY ) await self.pterodactyl_api.initialize() logger.info("Initialized Pterodactyl API client") - + # Load saved embed locations await self.load_embed_locations() - + # Start background update task self.update_status.start() logger.info("Started background status update task") - + async def load_embed_locations(self): """Load saved embed locations from JSON storage file.""" logger.debug("Attempting to load embed locations from storage") if not self.embed_storage_path.exists(): logger.info("No existing embed locations file found") return - + try: - with open(self.embed_storage_path, 'r') as f: + with open(self.embed_storage_path, "r") as f: self.embed_locations = json.load(f) - logger.info(f"Loaded {len(self.embed_locations)} embed locations from storage") + logger.info( + f"Loaded {len(self.embed_locations)} embed locations from storage" + ) except Exception as e: logger.error(f"Failed to load embed locations: {str(e)}") - + async def save_embed_locations(self): """Save current embed locations to JSON storage file.""" logger.debug("Attempting to save embed locations to storage") try: - with open(self.embed_storage_path, 'w') as f: + with open(self.embed_storage_path, "w") as f: json.dump(self.embed_locations, f, indent=2) logger.debug("Successfully saved embed locations to disk") except Exception as e: logger.error(f"Failed to save embed locations: {str(e)}") - + async def refresh_all_embeds(self) -> Tuple[int, int]: """ Perform a complete refresh of all server status embeds. Creates new embeds and deletes old ones to prevent duplication. - + Returns: Tuple of (deleted_count, created_count) - number of embeds processed """ @@ -621,7 +718,7 @@ class PterodactylBot(commands.Bot): async with self.update_lock: try: await asyncio.sleep(1) # Initial delay - + # Get current server list if cache is empty if not self.server_cache: logger.debug("Server cache empty, fetching fresh server list") @@ -629,155 +726,207 @@ class PterodactylBot(commands.Bot): if not servers: logger.warning("No servers found in Pterodactyl panel") return 0, 0 - - self.server_cache = {server['attributes']['identifier']: server for server in servers} + + self.server_cache = { + server["attributes"]["identifier"]: server for server in servers + } logger.info(f"Populated server cache with {len(servers)} servers") - + # Create new embeds in temporary storage new_embeds = {} created_count = 0 skipped_count = 0 - + for server_id, server_data in self.server_cache.items(): # Skip if we don't have an existing location to recreate in if server_id not in self.embed_locations: skipped_count += 1 continue - - channel_id = self.embed_locations[server_id]['channel_id'] + + channel_id = self.embed_locations[server_id]["channel_id"] channel = self.get_channel(int(channel_id)) if not channel: - logger.warning(f"Channel {channel_id} not found for server {server_id}") + logger.warning( + f"Channel {channel_id} not found for server {server_id}" + ) continue - + try: logger.debug(f"Creating new embed for server {server_id}") # Get current server status - resources = await self.pterodactyl_api.get_server_resources(server_id) - + resources = await self.pterodactyl_api.get_server_resources( + server_id + ) + # Create new embed - embed, view = await self.get_server_status_embed(server_data, resources) + embed, view = await self.get_server_status_embed( + server_data, resources + ) message = await channel.send(embed=embed, view=view) - + # Store in temporary location new_embeds[server_id] = { - 'channel_id': str(channel.id), - 'message_id': str(message.id) + "channel_id": str(channel.id), + "message_id": str(message.id), } created_count += 1 - logger.info(f"Created new embed for server {server_data['attributes']['name']}") - + logger.info( + f"Created new embed for server {server_data['attributes']['name']}" + ) + await asyncio.sleep(1) # Rate limit protection except Exception as e: - logger.error(f"Failed to create new embed for server {server_id}: {str(e)}") - - logger.info(f"Created {created_count} new embeds, skipped {skipped_count} servers") - + logger.error( + f"Failed to create new embed for server {server_id}: {str(e)}" + ) + + logger.info( + f"Created {created_count} new embeds, skipped {skipped_count} servers" + ) + # Only proceed if we created at least one new embed if not new_embeds: logger.warning("No new embeds created during refresh") return 0, 0 - + # Now delete old embeds deleted_count = 0 not_found_count = 0 - + for server_id, location in list(self.embed_locations.items()): try: - channel = self.get_channel(int(location['channel_id'])) + channel = self.get_channel(int(location["channel_id"])) if channel: try: - message = await channel.fetch_message(int(location['message_id'])) + message = await channel.fetch_message( + int(location["message_id"]) + ) await message.delete() deleted_count += 1 - logger.debug(f"Deleted old embed for server {server_id}") + logger.debug( + f"Deleted old embed for server {server_id}" + ) await asyncio.sleep(0.5) # Rate limit protection except discord.NotFound: not_found_count += 1 - logger.debug(f"Old embed for server {server_id} already deleted") + logger.debug( + f"Old embed for server {server_id} already deleted" + ) except Exception as e: - logger.error(f"Failed to delete old embed for server {server_id}: {str(e)}") + logger.error( + f"Failed to delete old embed for server {server_id}: {str(e)}" + ) except Exception as e: - logger.error(f"Error processing old embed for server {server_id}: {str(e)}") - - logger.info(f"Deleted {deleted_count} old embeds, {not_found_count} already missing") - + logger.error( + f"Error processing old embed for server {server_id}: {str(e)}" + ) + + logger.info( + f"Deleted {deleted_count} old embeds, {not_found_count} already missing" + ) + # Update storage with new embed locations self.embed_locations = new_embeds await self.save_embed_locations() - + return deleted_count, created_count - + except Exception as e: logger.error(f"Critical error during embed refresh: {str(e)}") raise - + async def track_new_embed(self, server_id: str, message: discord.Message): """ Track a newly created embed in storage. - + Args: server_id: The server's Pterodactyl identifier message: Discord message containing the embed """ - logger.debug(f"Tracking new embed for server {server_id} in channel {message.channel.id}") + logger.debug( + f"Tracking new embed for server {server_id} in channel {message.channel.id}" + ) self.embed_locations[server_id] = { - 'channel_id': str(message.channel.id), - 'message_id': str(message.id) + "channel_id": str(message.channel.id), + "message_id": str(message.id), } await self.save_embed_locations() - - async def get_server_status_embed(self, server_data: dict, resources: dict) -> Tuple[discord.Embed, ServerStatusView]: + + async def get_server_status_embed( + self, server_data: dict, resources: dict + ) -> Tuple[discord.Embed, ServerStatusView]: """ Create a status embed and view for a server. - + Args: server_data: Server information from Pterodactyl resources: Current resource usage data - + Returns: Tuple of (embed, view) objects ready for display """ - attributes = server_data.get('attributes', {}) - identifier = attributes.get('identifier', 'unknown') - name = attributes.get('name', 'Unknown Server') - description = attributes.get('description', 'No description available') + attributes = server_data.get("attributes", {}) + identifier = attributes.get("identifier", "unknown") + name = attributes.get("name", "Unknown Server") + description = attributes.get("description", "No description available") logger.debug(f"Building status embed for server {name} ({identifier})") - + # Parse resource data - resource_attributes = resources.get('attributes', {}) - current_state = resource_attributes.get('current_state', 'offline').title() - is_suspended = attributes.get('suspended', False) - + resource_attributes = resources.get("attributes", {}) + current_state = resource_attributes.get("current_state", "offline").title() + is_suspended = attributes.get("suspended", False) + # Create embed with appropriate color based on status embed = discord.Embed( title=f"{name} - {current_state}", description=description, - color=discord.Color.blue() if current_state.lower() == "running" else discord.Color.red(), - timestamp=datetime.now() + color=( + discord.Color.blue() + if current_state.lower() == "running" + else discord.Color.red() + ), + timestamp=datetime.now(), ) - + embed.add_field(name="🆔 Server ID", value=f"`{identifier}`", inline=True) - + if is_suspended: embed.add_field(name="â„šī¸ Status", value="⛔ `Suspended`", inline=True) else: embed.add_field(name="â„šī¸ Status", value="✅ `Active`", inline=True) - + # Add resource usage if server is running if current_state.lower() != "offline": # Current usage - cpu_usage = round(resource_attributes.get('resources', {}).get('cpu_absolute', 0), 2) - memory_usage = round(resource_attributes.get('resources', {}).get('memory_bytes', 0) / (1024 ** 2), 2) - disk_usage = round(resource_attributes.get('resources', {}).get('disk_bytes', 0) / (1024 ** 2), 2) - network_rx = round(resource_attributes.get('resources', {}).get('network_rx_bytes', 0) / (1024 ** 2), 2) - network_tx = round(resource_attributes.get('resources', {}).get('network_tx_bytes', 0) / (1024 ** 2), 2) - + cpu_usage = round( + resource_attributes.get("resources", {}).get("cpu_absolute", 0), 2 + ) + memory_usage = round( + resource_attributes.get("resources", {}).get("memory_bytes", 0) + / (1024**2), + 2, + ) + disk_usage = round( + resource_attributes.get("resources", {}).get("disk_bytes", 0) + / (1024**2), + 2, + ) + network_rx = round( + resource_attributes.get("resources", {}).get("network_rx_bytes", 0) + / (1024**2), + 2, + ) + network_tx = round( + resource_attributes.get("resources", {}).get("network_tx_bytes", 0) + / (1024**2), + 2, + ) + # Maximum allocated resources from server data - limits = attributes.get('limits', {}) - cpu_limit = limits.get('cpu', 0) - memory_limit = limits.get('memory', 0) - disk_limit = limits.get('disk', 0) + limits = attributes.get("limits", {}) + cpu_limit = limits.get("cpu", 0) + memory_limit = limits.get("memory", 0) + disk_limit = limits.get("disk", 0) # Format limit values - display ∞ for unlimited (0 limit) def format_limit(value, unit=""): @@ -785,10 +934,10 @@ class PterodactylBot(commands.Bot): return f"{'∞':<8}]{unit}" # Lemniscate symbol for infinity else: return f"{value:<8}]{unit}" - + # Get uptime from Pterodactyl API (in milliseconds) - uptime_ms = resource_attributes.get('resources', {}).get('uptime', 0) - + uptime_ms = resource_attributes.get("resources", {}).get("uptime", 0) + # Format uptime for display if uptime_ms > 0: uptime_seconds = uptime_ms // 1000 # Convert ms to seconds @@ -806,9 +955,9 @@ class PterodactylBot(commands.Bot): uptime_text = f"`{days}d {hours}h`" else: uptime_text = "`Just started`" - + embed.add_field(name="âąī¸ Uptime", value=uptime_text, inline=True) - + # Create dedicated usage text box with current usage and limits in monospace font usage_text = ( f"```properties\n" @@ -817,25 +966,17 @@ class PterodactylBot(commands.Bot): f"Disk : [{disk_usage:>8} / {format_limit(disk_limit, ' MiB')}\n" f"```" ) - + + embed.add_field(name="📊 Resource Usage", value=usage_text, inline=False) + embed.add_field( - name="📊 Resource Usage", - value=usage_text, - inline=False - ) - - embed.add_field( - name="Network In", - value=f"đŸ“Ĩ `{network_rx} MiB`", - inline=True + name="Network In", value=f"đŸ“Ĩ `{network_rx} MiB`", inline=True ) embed.add_field( - name="Network Out", - value=f"📤 `{network_tx} MiB`", - inline=True + name="Network Out", value=f"📤 `{network_tx} MiB`", inline=True ) - + # Add graph images if available server_graphs = self.metrics_manager.get_server_graphs(identifier) if server_graphs and server_graphs.has_sufficient_data: @@ -845,30 +986,30 @@ class PterodactylBot(commands.Bot): f">>> `Data points: {summary['point_count']}/6`\n" f"`CPU trend: {summary['cpu_trend']} â€ĸ Memory trend: {summary['memory_trend']}`" ) - + # Add a field explaining the graphs embed.add_field( name="📈 Usage Trends (Last Minute)", value=graph_description, - inline=False + inline=False, ) - + # Set graph images (these will be attached as files in the update_status method) embed.set_image(url=f"attachment://metrics_graph_{identifier}.png") - + embed.set_footer(text="Last updated") - + # Create interactive view with control buttons view = ServerStatusView( server_id=identifier, server_name=name, pterodactyl_api=self.pterodactyl_api, - server_data=server_data + server_data=server_data, ) - + logger.debug(f"Successfully built status components for {name}") return embed, view - + @tasks.loop(seconds=UPDATE_INTERVAL) async def update_status(self): """ @@ -877,7 +1018,7 @@ class PterodactylBot(commands.Bot): 2. Significant CPU usage change (>50% difference) 3. First time seeing the server 4. Server has been running for 10 minutes (force update for uptime) - + This minimizes API calls to Discord and updates while maintaining real-time awareness of important server changes. """ @@ -887,140 +1028,228 @@ class PterodactylBot(commands.Bot): # Fetch current server list from Pterodactyl servers = await self.pterodactyl_api.get_servers() if not servers: - logger.warning("No servers found in Pterodactyl panel during update") + logger.warning( + "No servers found in Pterodactyl panel during update" + ) return - + # Update our local cache with fresh server data - self.server_cache = {server['attributes']['identifier']: server for server in servers} + self.server_cache = { + server["attributes"]["identifier"]: server for server in servers + } logger.debug(f"Updated server cache with {len(servers)} servers") # Clean up metrics for servers that no longer exist active_server_ids = list(self.server_cache.keys()) self.metrics_manager.cleanup_old_servers(active_server_ids) - + # Variables to track our update statistics update_count = 0 # Successful updates error_count = 0 # Failed updates missing_count = 0 # Missing embeds skipped_count = 0 # Servers that didn't need updates current_time = datetime.now().timestamp() - + # Process each server we're tracking embeds for for server_id, location in list(self.embed_locations.items()): # Skip if server no longer exists in Pterodactyl if server_id not in self.server_cache: - logger.warning(f"Server {server_id} not found in cache, skipping update") + logger.warning( + f"Server {server_id} not found in cache, skipping update" + ) continue - + server_data = self.server_cache[server_id] - server_name = server_data['attributes']['name'] - + server_name = server_data["attributes"]["name"] + try: - logger.debug(f"Checking status for server {server_name} ({server_id})") - + logger.debug( + f"Checking status for server {server_name} ({server_id})" + ) + # Get current server resource usage - resources = await self.pterodactyl_api.get_server_resources(server_id) - current_state = resources.get('attributes', {}).get('current_state', 'offline') - cpu_usage = round(resources.get('attributes', {}).get('resources', {}).get('cpu_absolute', 0), 2) + resources = await self.pterodactyl_api.get_server_resources( + server_id + ) + current_state = resources.get("attributes", {}).get( + "current_state", "offline" + ) + cpu_usage = round( + resources.get("attributes", {}) + .get("resources", {}) + .get("cpu_absolute", 0), + 2, + ) # Collect metrics data for running servers - if current_state == 'running': - memory_usage = round(resources.get('attributes', {}).get('resources', {}).get('memory_bytes', 0) / (1024 ** 2), 2) - self.metrics_manager.add_server_data(server_id, server_name, cpu_usage, memory_usage) - logger.debug(f"Added metrics data for {server_name}: CPU={cpu_usage}%, Memory={memory_usage}MB") - + if current_state == "running": + memory_usage = round( + resources.get("attributes", {}) + .get("resources", {}) + .get("memory_bytes", 0) + / (1024**2), + 2, + ) + self.metrics_manager.add_server_data( + server_id, server_name, cpu_usage, memory_usage + ) + logger.debug( + f"Added metrics data for {server_name}: CPU={cpu_usage}%, Memory={memory_usage}MB" + ) + # Retrieve previous recorded state, CPU usage, and last force update time - prev_state, prev_cpu, last_force_update = self.previous_states.get(server_id, (None, 0, None)) - + prev_state, prev_cpu, last_force_update = ( + self.previous_states.get(server_id, (None, 0, None)) + ) + # DECISION LOGIC: Should we update the embed? needs_update = False - + # 1. Check if power state changed (most important) if current_state != prev_state: - logger.debug(f"Power state changed for {server_name}: {prev_state} -> {current_state}") + logger.debug( + f"Power state changed for {server_name}: {prev_state} -> {current_state}" + ) needs_update = True - + # 2. Check for significant CPU change (only if server is running) - elif current_state == 'running' and abs(cpu_usage - prev_cpu) > 50: - logger.debug(f"Significant CPU change for {server_name}: {prev_cpu}% -> {cpu_usage}%") + elif ( + current_state == "running" + and abs(cpu_usage - prev_cpu) > 50 + ): + logger.debug( + f"Significant CPU change for {server_name}: {prev_cpu}% -> {cpu_usage}%" + ) needs_update = True - + # 3. First time we're seeing this server (initial update) elif prev_state is None: - logger.debug(f"First check for {server_name}, performing initial update") + logger.debug( + f"First check for {server_name}, performing initial update" + ) needs_update = True - + # 4. Force update every 10 minutes for running servers (for uptime counter) - elif (current_state == 'running' and - (last_force_update is None or - current_time - last_force_update >= 600)): # 10 minutes = 600 seconds - logger.debug(f"Executing 10-minute force update for running server {server_name}") + elif current_state == "running" and ( + last_force_update is None + or current_time - last_force_update >= 600 + ): # 10 minutes = 600 seconds + logger.debug( + f"Executing 10-minute force update for running server {server_name}" + ) needs_update = True # Update the last force update time last_force_update = current_time - + # PERFORM UPDATE IF NEEDED if needs_update: # Generate fresh embed and view components - embed, view = await self.get_server_status_embed(server_data, resources) - + embed, view = await self.get_server_status_embed( + server_data, resources + ) + # Get the channel where this server's embed lives - channel = self.get_channel(int(location['channel_id'])) + channel = self.get_channel(int(location["channel_id"])) if not channel: - logger.warning(f"Channel {location['channel_id']} not found for server {server_id}") + logger.warning( + f"Channel {location['channel_id']} not found for server {server_id}" + ) continue - + # Fetch and update the existing message - message = await channel.fetch_message(int(location['message_id'])) - + message = await channel.fetch_message( + int(location["message_id"]) + ) + # Check if server is transitioning to offline/stopping state # and remove image attachment if present files = [] - server_graphs = self.metrics_manager.get_server_graphs(server_id) - + server_graphs = self.metrics_manager.get_server_graphs( + server_id + ) + # Only include graph images if server is running AND has sufficient data - if (current_state == 'running' and - server_graphs and - server_graphs.has_sufficient_data): + if ( + current_state == "running" + and server_graphs + and server_graphs.has_sufficient_data + ): # Generate metrics graph combined_graph = server_graphs.generate_combined_graph() if combined_graph: - files.append(discord.File(combined_graph, filename=f"metrics_graph_{server_id}.png")) - logger.debug(f"Including metrics graph for running server {server_name}") + files.append( + discord.File( + combined_graph, + filename=f"metrics_graph_{server_id}.png", + ) + ) + logger.debug( + f"Including metrics graph for running server {server_name}" + ) else: # Server is offline/stopping - ensure no image is attached - logger.debug(f"Server {server_name} is {current_state}, removing image attachment if present") + logger.debug( + f"Server {server_name} is {current_state}, removing image attachment if present" + ) # We'll update without files to remove any existing attachments - + # Update message with embed, view, and files (empty files list removes attachments) - await message.edit(embed=embed, view=view, attachments=files) + await message.edit( + embed=embed, view=view, attachments=files + ) update_count += 1 logger.debug(f"Updated status for {server_name}") - + # Update our state tracking with new values # Only update last_force_update if this was a force update - new_last_force_update = last_force_update if needs_update and current_state == 'running' and current_time - (last_force_update or 0) >= 600 else (last_force_update if last_force_update is not None else None) - self.previous_states[server_id] = (current_state, cpu_usage, new_last_force_update) + new_last_force_update = ( + last_force_update + if needs_update + and current_state == "running" + and current_time - (last_force_update or 0) >= 600 + else ( + last_force_update + if last_force_update is not None + else None + ) + ) + self.previous_states[server_id] = ( + current_state, + cpu_usage, + new_last_force_update, + ) else: # No significant changes detected, but update tracking with current state - self.previous_states[server_id] = (current_state, cpu_usage, last_force_update) + self.previous_states[server_id] = ( + current_state, + cpu_usage, + last_force_update, + ) skipped_count += 1 - logger.debug(f"No changes detected for {server_name}, skipping update") - + logger.debug( + f"No changes detected for {server_name}, skipping update" + ) + except discord.NotFound: # Embed message was deleted - clean up our tracking - logger.warning(f"Embed for server {server_id} not found, removing from tracking") + logger.warning( + f"Embed for server {server_id} not found, removing from tracking" + ) self.embed_locations.pop(server_id, None) - self.previous_states.pop(server_id, None) # Also clean up state tracking + self.previous_states.pop( + server_id, None + ) # Also clean up state tracking missing_count += 1 await self.save_embed_locations() except Exception as e: - logger.error(f"Failed to update status for server {server_id}: {str(e)}") + logger.error( + f"Failed to update status for server {server_id}: {str(e)}" + ) error_count += 1 - + # Small delay between servers to avoid rate limits await asyncio.sleep(0.5) - + # Log summary of this update cycle logger.info( f"Update cycle complete: " @@ -1029,19 +1258,19 @@ class PterodactylBot(commands.Bot): f"{missing_count} missing, " f"{error_count} errors" ) - + except Exception as e: logger.error(f"Error in update_status task: {str(e)}") # If something went wrong, wait before retrying await asyncio.sleep(5) - + @update_status.before_loop async def before_update_status(self): """Wait for bot to be ready before starting update task.""" logger.debug("Waiting for bot readiness before starting update task") await self.wait_until_ready() await self.refresh_all_embeds() - + @update_status.after_loop async def after_update_status(self): """Handle update task stopping.""" @@ -1049,7 +1278,7 @@ class PterodactylBot(commands.Bot): logger.info("Server status update task was cancelled") elif self.update_status.failed(): logger.error("Server status update task failed") - + async def close(self): """Cleanup when bot is shutting down.""" logger.info("Bot shutdown initiated - performing cleanup") @@ -1058,6 +1287,7 @@ class PterodactylBot(commands.Bot): await self.pterodactyl_api.close() await super().close() + # ============================================== # DISCORD COMMANDS # ============================================== @@ -1067,37 +1297,42 @@ intents.message_content = True bot = PterodactylBot(command_prefix="!", intents=intents) + async def check_allowed_guild(interaction: discord.Interaction) -> bool: """ Verify that an interaction is coming from the allowed guild. - + Args: interaction: Discord interaction object - + Returns: bool: True if interaction is allowed, False otherwise """ if interaction.guild_id != ALLOWED_GUILD_ID: - logger.warning(f"Command attempted from unauthorized guild {interaction.guild_id} by {interaction.user.name}") + logger.warning( + f"Command attempted from unauthorized guild {interaction.guild_id} by {interaction.user.name}" + ) await interaction.response.send_message( - "This bot is only available in a specific server.", - ephemeral=True + "This bot is only available in a specific server.", ephemeral=True ) return False return True -@bot.tree.command(name="server_status", description="Get a list of available game servers to control") + +@bot.tree.command( + name="server_status", description="Get a list of available game servers to control" +) async def server_status(interaction: discord.Interaction): """ Slash command to display server status dashboard with interactive dropdown selection. - + This command provides a comprehensive server management interface by: 1. Fetching current server list from Pterodactyl panel 2. Generating real-time statistics (online/offline counts) 3. Displaying an informational embed with server statistics 4. Presenting an ephemeral dropdown menu with all available servers 5. Handling server selection to create permanent status embeds in the channel - + Workflow: - Validates guild permissions and defers ephemeral response - Refreshes server cache from Pterodactyl API @@ -1107,110 +1342,120 @@ async def server_status(interaction: discord.Interaction): - Handles user selection via ephemeral dropdown interaction - Creates permanent status embed in channel upon selection - Manages embed tracking and cleanup of previous embeds - + Ephemeral Design: - Initial dashboard and dropdown are ephemeral (visible only to user) - Automatically disappears after use or timeout (3 minutes) - No manual cleanup required for dropdown interface - Only final server status embed is posted publicly - + Error Handling: - Handles API failures during server enumeration - Manages missing servers between selection and execution - Provides user-friendly error messages for all failure scenarios - Maintains comprehensive logging for troubleshooting - + Args: interaction: Discord interaction object representing the command invocation - + Returns: None: Sends ephemeral dashboard with dropdown, then public status embed on selection """ # Check if interaction is from allowed guild if not await check_allowed_guild(interaction): return - + logger.info(f"Server status command invoked by {interaction.user.name}") await interaction.response.defer(ephemeral=True) - + try: # Refresh server cache with current data from Pterodactyl panel servers = await bot.pterodactyl_api.get_servers() if not servers: logger.warning("No servers found in Pterodactyl panel") - await interaction.followup.send("No servers found in the Pterodactyl panel.", ephemeral=True) + await interaction.followup.send( + "No servers found in the Pterodactyl panel.", ephemeral=True + ) return - - bot.server_cache = {server['attributes']['identifier']: server for server in servers} + + bot.server_cache = { + server["attributes"]["identifier"]: server for server in servers + } logger.debug(f"Refreshed server cache with {len(servers)} servers") - + # Count online/offline servers by checking each server's current state online_count = 0 offline_count = 0 - + # Check status for each server to generate accurate statistics for server_id, server_data in bot.server_cache.items(): resources = await bot.pterodactyl_api.get_server_resources(server_id) - current_state = resources.get('attributes', {}).get('current_state', 'offline') - - if current_state == 'running': + current_state = resources.get("attributes", {}).get( + "current_state", "offline" + ) + + if current_state == "running": online_count += 1 else: offline_count += 1 - + # Create statistics embed with visual server status breakdown stats_embed = discord.Embed( title="đŸ—ī¸ Server Status Dashboard", description="Select a server from the dropdown below to view its detailed status and controls.", color=discord.Color.blue(), - timestamp=datetime.now() + timestamp=datetime.now(), ) - + stats_embed.add_field( name="📊 Server Statistics", value=f"**Total Servers:** {len(servers)}\n" - f"✅ **Online:** {online_count}\n" - f"❌ **Offline:** {offline_count}", - inline=False + f"✅ **Online:** {online_count}\n" + f"❌ **Offline:** {offline_count}", + inline=False, ) - + stats_embed.add_field( name="â„šī¸ How to Use", value="Use the dropdown menu below to select a server. The server's status embed will be posted in this channel.", - inline=False + inline=False, ) - + stats_embed.set_footer(text="Server status will update automatically") - + # Create dropdown menu options from available servers server_options = [] for server_id, server_data in bot.server_cache.items(): - server_name = server_data['attributes']['name'] - server_description = server_data['attributes'].get('description', 'No description') - + server_name = server_data["attributes"]["name"] + server_description = server_data["attributes"].get( + "description", "No description" + ) + # Truncate description if too long for dropdown constraints if len(server_description) > 50: server_description = server_description[:47] + "..." - + server_options.append( discord.SelectOption( - label=server_name, - value=server_id, - description=server_description + label=server_name, value=server_id, description=server_description ) ) - + # Create dropdown view with timeout for automatic cleanup class ServerDropdownView(discord.ui.View): - def __init__(self, server_options, timeout=180): # 3 minute timeout for ephemeral cleanup + def __init__( + self, server_options, timeout=180 + ): # 3 minute timeout for ephemeral cleanup super().__init__(timeout=timeout) self.server_options = server_options self.add_item(ServerDropdown(server_options)) - + async def on_timeout(self): # Clean up when dropdown times out (ephemeral auto-removal) - logger.debug("Server dropdown timed out and was automatically cleaned up") - + logger.debug( + "Server dropdown timed out and was automatically cleaned up" + ) + # Dropdown selection handler for server choice class ServerDropdown(discord.ui.Select): def __init__(self, server_options): @@ -1218,129 +1463,151 @@ async def server_status(interaction: discord.Interaction): placeholder="Select a server to display...", options=server_options, min_values=1, - max_values=1 + max_values=1, ) - + async def callback(self, interaction: discord.Interaction): """ Handle server selection from dropdown menu. Creates permanent status embed in the channel for the selected server. """ await interaction.response.defer(ephemeral=True) - + selected_server_id = self.values[0] server_data = bot.server_cache.get(selected_server_id) - + if not server_data: await interaction.followup.send( "❌ Selected server no longer available. Please try again.", - ephemeral=True + ephemeral=True, ) return - - server_name = server_data['attributes']['name'] - logger.info(f"User {interaction.user.name} selected server: {server_name}") - + + server_name = server_data["attributes"]["name"] + logger.info( + f"User {interaction.user.name} selected server: {server_name}" + ) + try: # Get current server status for embed creation - resources = await bot.pterodactyl_api.get_server_resources(selected_server_id) - + resources = await bot.pterodactyl_api.get_server_resources( + selected_server_id + ) + # Delete old embed if it exists to prevent duplication if selected_server_id in bot.embed_locations: - logger.debug(f"Found existing embed for {selected_server_id}, attempting to delete") + logger.debug( + f"Found existing embed for {selected_server_id}, attempting to delete" + ) try: old_location = bot.embed_locations[selected_server_id] - old_channel = bot.get_channel(int(old_location['channel_id'])) + old_channel = bot.get_channel( + int(old_location["channel_id"]) + ) if old_channel: try: - old_message = await old_channel.fetch_message(int(old_location['message_id'])) + old_message = await old_channel.fetch_message( + int(old_location["message_id"]) + ) await old_message.delete() - logger.debug(f"Deleted old embed for {selected_server_id}") + logger.debug( + f"Deleted old embed for {selected_server_id}" + ) except discord.NotFound: - logger.debug(f"Old embed for {selected_server_id} already deleted") + logger.debug( + f"Old embed for {selected_server_id} already deleted" + ) except Exception as e: logger.error(f"Failed to delete old embed: {str(e)}") - + # Create and send new permanent status embed in channel - embed, view = await bot.get_server_status_embed(server_data, resources) + embed, view = await bot.get_server_status_embed( + server_data, resources + ) message = await interaction.channel.send(embed=embed, view=view) await bot.track_new_embed(selected_server_id, message) - + await interaction.followup.send( f"✅ **{server_name}** status has been posted in {interaction.channel.mention}", - ephemeral=True + ephemeral=True, ) logger.info(f"Successfully posted status for {server_name}") - + except Exception as e: - logger.error(f"Failed to create status embed for {server_name}: {str(e)}") + logger.error( + f"Failed to create status embed for {server_name}: {str(e)}" + ) await interaction.followup.send( f"❌ Failed to create status embed for **{server_name}**: {str(e)}", - ephemeral=True + ephemeral=True, ) - + # Send the initial dashboard embed with dropdown (ephemeral - auto-cleaned) await interaction.followup.send( - embed=stats_embed, - view=ServerDropdownView(server_options), - ephemeral=True + embed=stats_embed, view=ServerDropdownView(server_options), ephemeral=True ) - logger.info(f"Sent server status dashboard to {interaction.user.name} with {len(server_options)} servers") - + logger.info( + f"Sent server status dashboard to {interaction.user.name} with {len(server_options)} servers" + ) + except Exception as e: logger.error(f"Server status command failed: {str(e)}") await interaction.followup.send( - f"❌ Failed to load server status: {str(e)}", - ephemeral=True + f"❌ Failed to load server status: {str(e)}", ephemeral=True ) -@bot.tree.command(name="refresh_embeds", description="Refresh all server status embeds (admin only)") + +@bot.tree.command( + name="refresh_embeds", description="Refresh all server status embeds (admin only)" +) async def refresh_embeds(interaction: discord.Interaction): """Slash command to refresh all server embeds.""" if not await check_allowed_guild(interaction): return - + logger.info(f"Refresh embeds command invoked by {interaction.user.name}") await interaction.response.defer(ephemeral=True) - + # Require administrator permissions if not interaction.user.guild_permissions.administrator: logger.warning(f"Unauthorized refresh attempt by {interaction.user.name}") await interaction.followup.send( - "You need administrator permissions to refresh all embeds.", - ephemeral=True + "You need administrator permissions to refresh all embeds.", ephemeral=True ) return - + try: logger.info("Starting full embed refresh per admin request") deleted, created = await bot.refresh_all_embeds() await interaction.followup.send( f"Refreshed all embeds. Deleted {deleted} old embeds, created {created} new ones.", - ephemeral=True + ephemeral=True, ) logger.info(f"Embed refresh completed: {deleted} deleted, {created} created") except Exception as e: logger.error(f"Embed refresh failed: {str(e)}") await interaction.followup.send( - f"Failed to refresh embeds: {str(e)}", - ephemeral=True + f"Failed to refresh embeds: {str(e)}", ephemeral=True ) -@bot.tree.command(name="purge_embeds", description="Permanently delete all server status embeds (admin only)") + +@bot.tree.command( + name="purge_embeds", + description="Permanently delete all server status embeds (admin only)", +) async def purge_embeds(interaction: discord.Interaction): """ Slash command to permanently purge all server status embeds from Discord channels. - + This command performs a complete cleanup of all tracked server status embeds by: 1. Iterating through all tracked embed locations in embed_locations.json 2. Attempting to delete each embed message from its respective Discord channel 3. Clearing the embed tracking file and internal state tracking 4. Providing real-time progress updates during the operation - + Args: interaction: Discord interaction object representing the command invocation - + Workflow: - Validates administrator permissions - Checks if any embeds are currently tracked @@ -1349,170 +1616,205 @@ async def purge_embeds(interaction: discord.Interaction): - Updates progress embed in real-time - Saves cleared tracking data to disk - Sends final results with comprehensive statistics - + Safety: - Only affects tracked embeds (won't delete arbitrary messages) - Maintains logs for audit purposes - Provides rollback protection through immediate tracking removal - Includes rate limiting to avoid Discord API limits - + Returns: None: Sends follow-up messages with operation results """ if not await check_allowed_guild(interaction): return - + logger.info(f"Purge embeds command invoked by {interaction.user.name}") await interaction.response.defer(ephemeral=True) - + # Require administrator permissions if not interaction.user.guild_permissions.administrator: logger.warning(f"Unauthorized purge attempt by {interaction.user.name}") await interaction.followup.send( - "You need administrator permissions to purge all embeds.", - ephemeral=True + "You need administrator permissions to purge all embeds.", ephemeral=True ) return - + try: logger.info("Starting embed purge per admin request") - + # Variables to track purge statistics deleted_count = 0 not_found_count = 0 error_count = 0 total_embeds = len(bot.embed_locations) - + if total_embeds == 0: await interaction.followup.send( "No embeds are currently being tracked. Nothing to purge.", - ephemeral=True + ephemeral=True, ) return - + # Create progress embed progress_embed = discord.Embed( title="🔄 Purging Server Embeds", description=f"Processing {total_embeds} embeds...", - color=discord.Color.orange() + color=discord.Color.orange(), ) progress_embed.add_field(name="Deleted", value="0", inline=True) progress_embed.add_field(name="Not Found", value="0", inline=True) progress_embed.add_field(name="Errors", value="0", inline=True) progress_embed.set_footer(text="This may take a while...") - - progress_message = await interaction.followup.send(embed=progress_embed, ephemeral=True) - + + progress_message = await interaction.followup.send( + embed=progress_embed, ephemeral=True + ) + # Process each tracked embed for server_id, location in list(bot.embed_locations.items()): try: - channel = bot.get_channel(int(location['channel_id'])) + channel = bot.get_channel(int(location["channel_id"])) if channel: try: - message = await channel.fetch_message(int(location['message_id'])) + message = await channel.fetch_message( + int(location["message_id"]) + ) await message.delete() deleted_count += 1 - logger.debug(f"Successfully purged embed for server {server_id}") + logger.debug( + f"Successfully purged embed for server {server_id}" + ) except discord.NotFound: not_found_count += 1 logger.debug(f"Embed for server {server_id} already deleted") except discord.Forbidden: error_count += 1 - logger.error(f"Permission denied when deleting embed for server {server_id}") + logger.error( + f"Permission denied when deleting embed for server {server_id}" + ) except Exception as e: error_count += 1 - logger.error(f"Error deleting embed for server {server_id}: {str(e)}") + logger.error( + f"Error deleting embed for server {server_id}: {str(e)}" + ) else: not_found_count += 1 logger.warning(f"Channel not found for server {server_id}") - + # Remove from tracking immediately bot.embed_locations.pop(server_id, None) bot.previous_states.pop(server_id, None) # Also clean up state tracking - + # Update progress every 5 embeds or for the last one - if (deleted_count + not_found_count + error_count) % 5 == 0 or \ - (deleted_count + not_found_count + error_count) == total_embeds: - + if (deleted_count + not_found_count + error_count) % 5 == 0 or ( + deleted_count + not_found_count + error_count + ) == total_embeds: + progress_embed.description = f"Processed {deleted_count + not_found_count + error_count}/{total_embeds} embeds" - progress_embed.set_field_at(0, name="Deleted", value=str(deleted_count), inline=True) - progress_embed.set_field_at(1, name="Not Found", value=str(not_found_count), inline=True) - progress_embed.set_field_at(2, name="Errors", value=str(error_count), inline=True) - + progress_embed.set_field_at( + 0, name="Deleted", value=str(deleted_count), inline=True + ) + progress_embed.set_field_at( + 1, name="Not Found", value=str(not_found_count), inline=True + ) + progress_embed.set_field_at( + 2, name="Errors", value=str(error_count), inline=True + ) + await progress_message.edit(embed=progress_embed) - + # Small delay to avoid rate limits await asyncio.sleep(0.3) - + except Exception as e: error_count += 1 - logger.error(f"Unexpected error processing server {server_id}: {str(e)}") - + logger.error( + f"Unexpected error processing server {server_id}: {str(e)}" + ) + # Save the cleared embed locations await bot.save_embed_locations() - + # Create results embed result_embed = discord.Embed( title="✅ Embed Purge Complete", color=discord.Color.green(), - timestamp=datetime.now() + timestamp=datetime.now(), + ) + result_embed.add_field( + name="Total Tracked", value=str(total_embeds), inline=True + ) + result_embed.add_field( + name="✅ Successfully Deleted", value=str(deleted_count), inline=True + ) + result_embed.add_field( + name="❌ Already Missing", value=str(not_found_count), inline=True ) - result_embed.add_field(name="Total Tracked", value=str(total_embeds), inline=True) - result_embed.add_field(name="✅ Successfully Deleted", value=str(deleted_count), inline=True) - result_embed.add_field(name="❌ Already Missing", value=str(not_found_count), inline=True) result_embed.add_field(name="âš ī¸ Errors", value=str(error_count), inline=True) - result_embed.add_field(name="📊 Success Rate", - value=f"{((deleted_count + not_found_count) / total_embeds * 100):.1f}%", - inline=True) + result_embed.add_field( + name="📊 Success Rate", + value=f"{((deleted_count + not_found_count) / total_embeds * 100):.1f}%", + inline=True, + ) result_embed.set_footer(text="Embed tracking file has been cleared") - + await progress_message.edit(embed=result_embed) - logger.info(f"Embed purge completed: {deleted_count} deleted, {not_found_count} not found, {error_count} errors") - + logger.info( + f"Embed purge completed: {deleted_count} deleted, {not_found_count} not found, {error_count} errors" + ) + except Exception as e: logger.error(f"Embed purge failed: {str(e)}") await interaction.followup.send( - f"❌ Failed to purge embeds: {str(e)}", - ephemeral=True + f"❌ Failed to purge embeds: {str(e)}", ephemeral=True ) + # ============================================== # BOT EVENTS # ============================================== + @bot.event async def on_interaction(interaction: discord.Interaction): """Global interaction handler to check guild before processing any interaction.""" if interaction.guild_id != ALLOWED_GUILD_ID: - logger.debug(f"Ignoring interaction from unauthorized guild {interaction.guild_id}") + logger.debug( + f"Ignoring interaction from unauthorized guild {interaction.guild_id}" + ) await interaction.response.send_message( - "This bot is only available in a specific server.", - ephemeral=True + "This bot is only available in a specific server.", ephemeral=True ) return + @bot.event async def on_ready(): """Called when the bot successfully connects to Discord.""" logger.info(f"Bot connected as {bot.user.name} (ID: {bot.user.id})") - + try: # Sync commands only to the allowed guild guild = discord.Object(id=ALLOWED_GUILD_ID) bot.tree.copy_global_to(guild=guild) synced = await bot.tree.sync(guild=guild) - logger.info(f"Successfully synced {len(synced)} command(s) to guild {ALLOWED_GUILD_ID}: {[cmd.name for cmd in synced]}") + logger.info( + f"Successfully synced {len(synced)} command(s) to guild {ALLOWED_GUILD_ID}: {[cmd.name for cmd in synced]}" + ) except Exception as e: logger.error(f"Command sync failed: {str(e)}") + # ============================================== # SYSTEM SIGNAL HANDLERS # ============================================== + def handle_sigint(signum: int, frame: types.FrameType) -> None: """ Handle SIGINT signals (Ctrl+C) by initiating graceful shutdown. - + Args: signum: The signal number (signal.SIGINT) frame: Current stack frame (unused but required by signal handler signature) @@ -1520,10 +1822,11 @@ def handle_sigint(signum: int, frame: types.FrameType) -> None: logger.info("Received SIGINT (Ctrl+C), initiating graceful shutdown...") raise KeyboardInterrupt + def handle_sigterm(signum: int, frame: types.FrameType) -> None: """ Handle SIGTERM signals (container stop) by initiating graceful shutdown. - + Args: signum: The signal number (signal.SIGTERM) frame: Current stack frame (unused but required by signal handler signature) @@ -1531,6 +1834,7 @@ def handle_sigterm(signum: int, frame: types.FrameType) -> None: logger.info("Received SIGTERM (container stop), initiating graceful shutdown...") raise KeyboardInterrupt + # ============================================== # BOT STARTUP # ============================================== @@ -1538,13 +1842,13 @@ def handle_sigterm(signum: int, frame: types.FrameType) -> None: if __name__ == "__main__": """ Main entry point for the bot application. - + Handles: - Signal registration for graceful shutdowns (SIGINT/SIGTERM) - Primary bot execution loop - Error handling and crash reporting - Resource cleanup on shutdown - + Flow: 1. Initialize signal handlers 2. Start bot with Discord token @@ -1554,7 +1858,7 @@ if __name__ == "__main__": logger.info("Starting bot initialization") # Register signal handlers - signal.signal(signal.SIGINT, handle_sigint) # For Ctrl+C + signal.signal(signal.SIGINT, handle_sigint) # For Ctrl+C signal.signal(signal.SIGTERM, handle_sigterm) # For container stop commands logger.info("System signal handlers registered") @@ -1567,4 +1871,4 @@ if __name__ == "__main__": sys.exit(1) # Exit with error code for crash finally: logger.info("Bot shutdown complete") - sys.exit(0) # Explicit clean exit \ No newline at end of file + sys.exit(0) # Explicit clean exit diff --git a/server_metrics_graphs.py b/server_metrics_graphs.py index d83cafa..532f87b 100644 --- a/server_metrics_graphs.py +++ b/server_metrics_graphs.py @@ -6,23 +6,25 @@ Generates line graphs as PNG images for embedding in Discord messages. """ import matplotlib -matplotlib.use('Agg') # Use non-interactive backend for server environments import matplotlib.pyplot as plt import matplotlib.dates as mdates from collections import deque -from datetime import datetime, timedelta -from typing import Dict, Tuple, Optional +from datetime import datetime +from typing import Dict, Optional import io import logging import math +matplotlib.use("Agg") # Use non-interactive backend for server environments + # Get the logger from the main bot module -logger = logging.getLogger('pterodisbot') +logger = logging.getLogger("pterodisbot") + class ServerMetricsGraphs: """ Manages CPU and memory usage graphs for individual servers. - + Features: - Stores last 6 data points (1 minute of history at 10-second intervals) - Generates PNG images of line graphs for Discord embedding @@ -31,31 +33,35 @@ class ServerMetricsGraphs: - Dynamic CPU scaling in 100% increments for multi-vCPU servers - Clean graph styling optimized for Discord dark theme """ - + def __init__(self, server_id: str, server_name: str): """ Initialize metrics tracking for a server. - + Args: server_id: Pterodactyl server identifier server_name: Human-readable server name """ self.server_id = server_id self.server_name = server_name - + # Use deque with maxlen=6 for automatic FIFO rotation # Each entry is a tuple: (timestamp, cpu_percent, memory_mb) self.data_points = deque(maxlen=6) - + # Track if we have enough data for meaningful graphs (at least 2 points) self.has_sufficient_data = False - - logger.debug(f"Initialized metrics tracking for server {server_name} ({server_id})") - - def add_data_point(self, cpu_percent: float, memory_mb: float, timestamp: Optional[datetime] = None): + + logger.debug( + f"Initialized metrics tracking for server {server_name} ({server_id})" + ) + + def add_data_point( + self, cpu_percent: float, memory_mb: float, timestamp: Optional[datetime] = None + ): """ Add a new data point to the metrics history. - + Args: cpu_percent: Current CPU usage percentage memory_mb: Current memory usage in megabytes @@ -63,351 +69,430 @@ class ServerMetricsGraphs: """ if timestamp is None: timestamp = datetime.now() - + # Add new data point (automatically rotates old data due to maxlen=6) self.data_points.append((timestamp, cpu_percent, memory_mb)) - + # Update sufficient data flag self.has_sufficient_data = len(self.data_points) >= 2 - - logger.debug(f"Added metrics data point for {self.server_name}: CPU={cpu_percent}%, Memory={memory_mb}MB") - + + logger.debug( + f"Added metrics data point for {self.server_name}: CPU={cpu_percent}%, Memory={memory_mb}MB" + ) + def _calculate_cpu_scale_limit(self, max_cpu_value: float) -> int: """ Calculate appropriate CPU scale limit in 100% increments. - + Args: max_cpu_value: Maximum CPU value in the dataset - + Returns: Scale limit rounded up to nearest 100% increment """ if max_cpu_value <= 100: return 100 - + # Round up to nearest 100% increment # e.g., 150% -> 200%, 250% -> 300%, 350% -> 400% return math.ceil(max_cpu_value / 100) * 100 - + def generate_cpu_graph(self) -> Optional[io.BytesIO]: """ Generate a CPU usage line graph as a PNG image. - + Returns: BytesIO object containing PNG image data, or None if insufficient data """ if not self.has_sufficient_data: - logger.debug(f"Insufficient data for CPU graph generation: {self.server_name}") + logger.debug( + f"Insufficient data for CPU graph generation: {self.server_name}" + ) return None - + try: # Extract timestamps and CPU data timestamps = [point[0] for point in self.data_points] cpu_values = [point[1] for point in self.data_points] - + # Calculate dynamic CPU scale limit max_cpu = max(cpu_values) cpu_scale_limit = self._calculate_cpu_scale_limit(max_cpu) - + # Create figure with dark theme styling - plt.style.use('dark_background') + plt.style.use("dark_background") fig, ax = plt.subplots(figsize=(8, 4), dpi=100) - fig.patch.set_facecolor('#2f3136') # Discord dark theme background - ax.set_facecolor('#36393f') # Slightly lighter for graph area - + fig.patch.set_facecolor("#2f3136") # Discord dark theme background + ax.set_facecolor("#36393f") # Slightly lighter for graph area + # Plot CPU line with gradient fill - line = ax.plot(timestamps, cpu_values, color='#7289da', linewidth=2.5, marker='o', markersize=4) - ax.fill_between(timestamps, cpu_values, alpha=0.3, color='#7289da') - + ax.fill_between(timestamps, cpu_values, alpha=0.3, color="#7289da") + # Customize axes with dynamic scaling - ax.set_ylabel('CPU Usage (%)', color='#ffffff', fontsize=10) + ax.set_ylabel("CPU Usage (%)", color="#ffffff", fontsize=10) ax.set_ylim(0, cpu_scale_limit) - + # Add horizontal grid lines at 100% increments for better readability for i in range(100, cpu_scale_limit + 1, 100): - ax.axhline(y=i, color='#ffffff', alpha=0.2, linestyle='--', linewidth=0.8) - + ax.axhline( + y=i, color="#ffffff", alpha=0.2, linestyle="--", linewidth=0.8 + ) + # Format time axis - ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S')) + ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S")) ax.xaxis.set_major_locator(mdates.SecondLocator(interval=20)) - plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right', color='#ffffff', fontsize=8) - + plt.setp( + ax.xaxis.get_majorticklabels(), + rotation=45, + ha="right", + color="#ffffff", + fontsize=8, + ) + # Style the graph - ax.tick_params(colors='#ffffff', labelsize=8) - ax.grid(True, alpha=0.3, color='#ffffff') - ax.spines['bottom'].set_color('#ffffff') - ax.spines['left'].set_color('#ffffff') - ax.spines['top'].set_visible(False) - ax.spines['right'].set_visible(False) - + ax.tick_params(colors="#ffffff", labelsize=8) + ax.grid(True, alpha=0.3, color="#ffffff") + ax.spines["bottom"].set_color("#ffffff") + ax.spines["left"].set_color("#ffffff") + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + # Add title with scale info for multi-vCPU servers - title = f'{self.server_name} - CPU Usage' + title = f"{self.server_name} - CPU Usage" if cpu_scale_limit > 100: estimated_vcpus = cpu_scale_limit // 100 - title += f' (~{estimated_vcpus} vCPU cores)' - ax.set_title(title, color='#ffffff', fontsize=12, pad=20) - + title += f" (~{estimated_vcpus} vCPU cores)" + ax.set_title(title, color="#ffffff", fontsize=12, pad=20) + # Tight layout to prevent label cutoff plt.tight_layout() - + # Save to BytesIO img_buffer = io.BytesIO() - plt.savefig(img_buffer, format='png', facecolor='#2f3136', edgecolor='none', - bbox_inches='tight', dpi=100) + plt.savefig( + img_buffer, + format="png", + facecolor="#2f3136", + edgecolor="none", + bbox_inches="tight", + dpi=100, + ) img_buffer.seek(0) - + # Clean up matplotlib resources plt.close(fig) - - logger.debug(f"Generated CPU graph for {self.server_name} (scale: 0-{cpu_scale_limit}%)") + + logger.debug( + f"Generated CPU graph for {self.server_name} (scale: 0-{cpu_scale_limit}%)" + ) return img_buffer - + except Exception as e: - logger.error(f"Failed to generate CPU graph for {self.server_name}: {str(e)}") - plt.close('all') # Clean up any remaining figures + logger.error( + f"Failed to generate CPU graph for {self.server_name}: {str(e)}" + ) + plt.close("all") # Clean up any remaining figures return None - + def generate_memory_graph(self) -> Optional[io.BytesIO]: """ Generate a memory usage line graph as a PNG image. - + Returns: BytesIO object containing PNG image data, or None if insufficient data """ if not self.has_sufficient_data: - logger.debug(f"Insufficient data for memory graph generation: {self.server_name}") + logger.debug( + f"Insufficient data for memory graph generation: {self.server_name}" + ) return None - + try: # Extract timestamps and memory data timestamps = [point[0] for point in self.data_points] memory_values = [point[2] for point in self.data_points] - + # Create figure with dark theme styling - plt.style.use('dark_background') + plt.style.use("dark_background") fig, ax = plt.subplots(figsize=(8, 4), dpi=100) - fig.patch.set_facecolor('#2f3136') # Discord dark theme background - ax.set_facecolor('#36393f') # Slightly lighter for graph area - + fig.patch.set_facecolor("#2f3136") # Discord dark theme background + ax.set_facecolor("#36393f") # Slightly lighter for graph area + # Plot memory line with gradient fill - line = ax.plot(timestamps, memory_values, color='#43b581', linewidth=2.5, marker='o', markersize=4) - ax.fill_between(timestamps, memory_values, alpha=0.3, color='#43b581') - + ax.fill_between(timestamps, memory_values, alpha=0.3, color="#43b581") + # Customize axes - ax.set_ylabel('Memory Usage (MB)', color='#ffffff', fontsize=10) + ax.set_ylabel("Memory Usage (MB)", color="#ffffff", fontsize=10) ax.set_ylim(0, max(memory_values) * 1.1) # Dynamic scaling with 10% padding - + # Format time axis - ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S')) + ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S")) ax.xaxis.set_major_locator(mdates.SecondLocator(interval=20)) - plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right', color='#ffffff', fontsize=8) - + plt.setp( + ax.xaxis.get_majorticklabels(), + rotation=45, + ha="right", + color="#ffffff", + fontsize=8, + ) + # Style the graph - ax.tick_params(colors='#ffffff', labelsize=8) - ax.grid(True, alpha=0.3, color='#ffffff') - ax.spines['bottom'].set_color('#ffffff') - ax.spines['left'].set_color('#ffffff') - ax.spines['top'].set_visible(False) - ax.spines['right'].set_visible(False) - + ax.tick_params(colors="#ffffff", labelsize=8) + ax.grid(True, alpha=0.3, color="#ffffff") + ax.spines["bottom"].set_color("#ffffff") + ax.spines["left"].set_color("#ffffff") + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + # Add title - ax.set_title(f'{self.server_name} - Memory Usage', color='#ffffff', fontsize=12, pad=20) - + ax.set_title( + f"{self.server_name} - Memory Usage", + color="#ffffff", + fontsize=12, + pad=20, + ) + # Tight layout to prevent label cutoff plt.tight_layout() - + # Save to BytesIO img_buffer = io.BytesIO() - plt.savefig(img_buffer, format='png', facecolor='#2f3136', edgecolor='none', - bbox_inches='tight', dpi=100) + plt.savefig( + img_buffer, + format="png", + facecolor="#2f3136", + edgecolor="none", + bbox_inches="tight", + dpi=100, + ) img_buffer.seek(0) - + # Clean up matplotlib resources plt.close(fig) - + logger.debug(f"Generated memory graph for {self.server_name}") return img_buffer - + except Exception as e: - logger.error(f"Failed to generate memory graph for {self.server_name}: {str(e)}") - plt.close('all') # Clean up any remaining figures + logger.error( + f"Failed to generate memory graph for {self.server_name}: {str(e)}" + ) + plt.close("all") # Clean up any remaining figures return None def generate_combined_graph(self) -> Optional[io.BytesIO]: """ Generate a combined CPU and memory usage graph as a PNG image. - + Returns: BytesIO object containing PNG image data, or None if insufficient data """ if not self.has_sufficient_data: - logger.debug(f"Insufficient data for combined graph generation: {self.server_name}") + logger.debug( + f"Insufficient data for combined graph generation: {self.server_name}" + ) return None - + try: # Extract data timestamps = [point[0] for point in self.data_points] cpu_values = [point[1] for point in self.data_points] memory_values = [point[2] for point in self.data_points] - + # Calculate dynamic CPU scale limit max_cpu = max(cpu_values) cpu_scale_limit = self._calculate_cpu_scale_limit(max_cpu) - + # Create figure with two subplots - plt.style.use('dark_background') + plt.style.use("dark_background") fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 6), dpi=100, sharex=True) - fig.patch.set_facecolor('#2f3136') - + fig.patch.set_facecolor("#2f3136") + # CPU subplot - ax1.set_facecolor('#36393f') - ax1.plot(timestamps, cpu_values, color='#7289da', linewidth=2.5, marker='o', markersize=4) - ax1.fill_between(timestamps, cpu_values, alpha=0.3, color='#7289da') - ax1.set_ylabel('CPU Usage (%)', color='#ffffff', fontsize=10) + ax1.set_facecolor("#36393f") + ax1.plot( + timestamps, + cpu_values, + color="#7289da", + linewidth=2.5, + marker="o", + markersize=4, + ) + ax1.fill_between(timestamps, cpu_values, alpha=0.3, color="#7289da") + ax1.set_ylabel("CPU Usage (%)", color="#ffffff", fontsize=10) ax1.set_ylim(0, cpu_scale_limit) - ax1.tick_params(colors='#ffffff', labelsize=8) - ax1.grid(True, alpha=0.3, color='#ffffff') - + ax1.tick_params(colors="#ffffff", labelsize=8) + ax1.grid(True, alpha=0.3, color="#ffffff") + # Add horizontal grid lines at 100% increments for CPU subplot for i in range(100, cpu_scale_limit + 1, 100): - ax1.axhline(y=i, color='#ffffff', alpha=0.2, linestyle='--', linewidth=0.8) - + ax1.axhline( + y=i, color="#ffffff", alpha=0.2, linestyle="--", linewidth=0.8 + ) + # Title with vCPU info if applicable - title = f'{self.server_name} - Resource Usage' + title = f"{self.server_name} - Resource Usage" if cpu_scale_limit > 100: estimated_vcpus = cpu_scale_limit // 100 - title += f' (~{estimated_vcpus} vCPU cores)' - ax1.set_title(title, color='#ffffff', fontsize=12) - + title += f" (~{estimated_vcpus} vCPU cores)" + ax1.set_title(title, color="#ffffff", fontsize=12) + # Memory subplot - ax2.set_facecolor('#36393f') - ax2.plot(timestamps, memory_values, color='#43b581', linewidth=2.5, marker='o', markersize=4) - ax2.fill_between(timestamps, memory_values, alpha=0.3, color='#43b581') - ax2.set_ylabel('Memory (MB)', color='#ffffff', fontsize=10) + ax2.set_facecolor("#36393f") + ax2.plot( + timestamps, + memory_values, + color="#43b581", + linewidth=2.5, + marker="o", + markersize=4, + ) + ax2.fill_between(timestamps, memory_values, alpha=0.3, color="#43b581") + ax2.set_ylabel("Memory (MB)", color="#ffffff", fontsize=10) ax2.set_ylim(0, max(memory_values) * 1.1) - ax2.tick_params(colors='#ffffff', labelsize=8) - ax2.grid(True, alpha=0.3, color='#ffffff') - + ax2.tick_params(colors="#ffffff", labelsize=8) + ax2.grid(True, alpha=0.3, color="#ffffff") + # Format time axis (only on bottom subplot) - ax2.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S')) + ax2.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S")) ax2.xaxis.set_major_locator(mdates.SecondLocator(interval=20)) - plt.setp(ax2.xaxis.get_majorticklabels(), rotation=45, ha='right', color='#ffffff', fontsize=8) - + plt.setp( + ax2.xaxis.get_majorticklabels(), + rotation=45, + ha="right", + color="#ffffff", + fontsize=8, + ) + # Style both subplots for ax in [ax1, ax2]: - ax.spines['bottom'].set_color('#ffffff') - ax.spines['left'].set_color('#ffffff') - ax.spines['top'].set_visible(False) - ax.spines['right'].set_visible(False) - + ax.spines["bottom"].set_color("#ffffff") + ax.spines["left"].set_color("#ffffff") + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + plt.tight_layout() - + # Save to BytesIO img_buffer = io.BytesIO() - plt.savefig(img_buffer, format='png', facecolor='#2f3136', edgecolor='none', - bbox_inches='tight', dpi=100) + plt.savefig( + img_buffer, + format="png", + facecolor="#2f3136", + edgecolor="none", + bbox_inches="tight", + dpi=100, + ) img_buffer.seek(0) - + plt.close(fig) - - logger.debug(f"Generated combined graph for {self.server_name} (CPU scale: 0-{cpu_scale_limit}%)") + + logger.debug( + f"Generated combined graph for {self.server_name} (CPU scale: 0-{cpu_scale_limit}%)" + ) return img_buffer - + except Exception as e: - logger.error(f"Failed to generate combined graph for {self.server_name}: {str(e)}") - plt.close('all') + logger.error( + f"Failed to generate combined graph for {self.server_name}: {str(e)}" + ) + plt.close("all") return None - + def get_data_summary(self) -> Dict[str, any]: """ Get summary statistics for the current data points. - + Returns: Dictionary containing data point count, latest values, and trends """ if not self.data_points: return { - 'point_count': 0, - 'has_data': False, - 'latest_cpu': 0, - 'latest_memory': 0 + "point_count": 0, + "has_data": False, + "latest_cpu": 0, + "latest_memory": 0, } - + # Get latest values latest_point = self.data_points[-1] latest_cpu = latest_point[1] latest_memory = latest_point[2] - + # Calculate CPU scale info max_cpu = max(point[1] for point in self.data_points) cpu_scale_limit = self._calculate_cpu_scale_limit(max_cpu) estimated_vcpus = cpu_scale_limit // 100 - + # Calculate trends if we have multiple points - cpu_trend = 'stable' - memory_trend = 'stable' - + cpu_trend = "stable" + memory_trend = "stable" + if len(self.data_points) >= 2: first_point = self.data_points[0] cpu_change = latest_cpu - first_point[1] memory_change = latest_memory - first_point[2] - + # Determine trends (>5% change considered significant) if abs(cpu_change) > 5: - cpu_trend = 'increasing' if cpu_change > 0 else 'decreasing' - + cpu_trend = "increasing" if cpu_change > 0 else "decreasing" + if abs(memory_change) > 50: # 50MB change threshold - memory_trend = 'increasing' if memory_change > 0 else 'decreasing' - + memory_trend = "increasing" if memory_change > 0 else "decreasing" + return { - 'point_count': len(self.data_points), - 'has_data': self.has_sufficient_data, - 'latest_cpu': latest_cpu, - 'latest_memory': latest_memory, - 'cpu_trend': cpu_trend, - 'memory_trend': memory_trend, - 'cpu_scale_limit': cpu_scale_limit, - 'estimated_vcpus': estimated_vcpus, - 'time_span_minutes': len(self.data_points) * 10 / 60 # Convert to minutes + "point_count": len(self.data_points), + "has_data": self.has_sufficient_data, + "latest_cpu": latest_cpu, + "latest_memory": latest_memory, + "cpu_trend": cpu_trend, + "memory_trend": memory_trend, + "cpu_scale_limit": cpu_scale_limit, + "estimated_vcpus": estimated_vcpus, + "time_span_minutes": len(self.data_points) * 10 / 60, # Convert to minutes } class ServerMetricsManager: """ Global manager for all server metrics graphs. - + Handles: - Creation and cleanup of ServerMetricsGraphs instances - Bulk operations across all tracked servers - Memory management for graph storage """ - + def __init__(self): """Initialize the metrics manager.""" self.server_graphs: Dict[str, ServerMetricsGraphs] = {} logger.info("Initialized ServerMetricsManager") - - def get_or_create_server_graphs(self, server_id: str, server_name: str) -> ServerMetricsGraphs: + + def get_or_create_server_graphs( + self, server_id: str, server_name: str + ) -> ServerMetricsGraphs: """ Get existing ServerMetricsGraphs instance or create a new one. - + Args: server_id: Pterodactyl server identifier server_name: Human-readable server name - + Returns: ServerMetricsGraphs instance for the specified server """ if server_id not in self.server_graphs: self.server_graphs[server_id] = ServerMetricsGraphs(server_id, server_name) logger.debug(f"Created new metrics graphs for server {server_name}") - + return self.server_graphs[server_id] - - def add_server_data(self, server_id: str, server_name: str, cpu_percent: float, memory_mb: float): + + def add_server_data( + self, server_id: str, server_name: str, cpu_percent: float, memory_mb: float + ): """ Add data point to a server's metrics tracking. - + Args: server_id: Pterodactyl server identifier server_name: Human-readable server name @@ -416,34 +501,34 @@ class ServerMetricsManager: """ graphs = self.get_or_create_server_graphs(server_id, server_name) graphs.add_data_point(cpu_percent, memory_mb) - + def remove_server(self, server_id: str): """ Remove a server from metrics tracking. - + Args: server_id: Pterodactyl server identifier to remove """ if server_id in self.server_graphs: del self.server_graphs[server_id] logger.debug(f"Removed metrics tracking for server {server_id}") - + def get_server_graphs(self, server_id: str) -> Optional[ServerMetricsGraphs]: """ Get ServerMetricsGraphs instance for a specific server. - + Args: server_id: Pterodactyl server identifier - + Returns: ServerMetricsGraphs instance or None if not found """ return self.server_graphs.get(server_id) - + def cleanup_old_servers(self, active_server_ids: list): """ Remove tracking for servers that no longer exist. - + Args: active_server_ids: List of currently active server IDs """ @@ -451,22 +536,30 @@ class ServerMetricsManager: for server_id in self.server_graphs: if server_id not in active_server_ids: servers_to_remove.append(server_id) - + for server_id in servers_to_remove: self.remove_server(server_id) - + if servers_to_remove: - logger.info(f"Cleaned up metrics for {len(servers_to_remove)} inactive servers") - + logger.info( + f"Cleaned up metrics for {len(servers_to_remove)} inactive servers" + ) + def get_summary(self) -> Dict[str, any]: """ Get summary of all tracked servers. - + Returns: Dictionary with tracking statistics """ return { - 'total_servers': len(self.server_graphs), - 'servers_with_data': sum(1 for graphs in self.server_graphs.values() if graphs.has_sufficient_data), - 'total_data_points': sum(len(graphs.data_points) for graphs in self.server_graphs.values()) - } \ No newline at end of file + "total_servers": len(self.server_graphs), + "servers_with_data": sum( + 1 + for graphs in self.server_graphs.values() + if graphs.has_sufficient_data + ), + "total_data_points": sum( + len(graphs.data_points) for graphs in self.server_graphs.values() + ), + }