""" Pterodactyl Discord Bot A comprehensive Discord bot for managing Pterodactyl game servers through Discord. Features: - Server status monitoring with auto-updating embeds - Power controls (start/stop/restart) - Server address/port display - Multi-channel embed support - Automatic embed refresh system - Role-based access control - Single-guild restriction - Extensive logging for all operations """ import discord from discord.ext import commands, tasks from discord import app_commands import os import sys import signal import types import aiohttp import asyncio import json import traceback import logging from logging.handlers import RotatingFileHandler import configparser from datetime import datetime from typing import Dict, List, Optional, Tuple from pathlib import Path import generate_config # ============================================== # LOGGING SETUP # ============================================== logs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs') os.makedirs(logs_dir, exist_ok=True) logger = logging.getLogger('pterodisbot') logger.setLevel(logging.DEBUG) # File handler for logs (rotates when reaching 5MB, keeps 3 backups) handler = RotatingFileHandler( filename=os.path.join(logs_dir, 'pterodisbot.log'), maxBytes=5*1024*1024, # 5 MiB max log file size backupCount=3, # Rotate through 3 files encoding='utf-8' ) handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(handler) # Console handler for real-time output console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(console_handler) logger.info("Initialized logging system with file and console output") # ============================================== # CONFIGURATION SETUP # ============================================== # generate_config.generate_config() # logger.debug("Gennerated config.ini file using values from .env") config = configparser.ConfigParser() config.read('config.ini') # ============================================== # CONFIGURATION VALIDATION # ============================================== class ConfigValidationError(Exception): """Custom exception for configuration validation errors.""" pass def validate_config(): """ Validate all required configuration values at startup. Raises ConfigValidationError if any required values are missing or invalid. """ errors = [] # Validate Pterodactyl section if not config.has_section('Pterodactyl'): errors.append("Missing [Pterodactyl] section in config.ini") else: required_ptero = ['PanelURL', 'ClientAPIKey', 'ApplicationAPIKey'] for key in required_ptero: if not config.get('Pterodactyl', key, fallback=None): errors.append(f"Missing required Pterodactyl config value: {key}") # Validate Discord section if not config.has_section('Discord'): errors.append("Missing [Discord] section in config.ini") else: required_discord = ['Token', 'AllowedGuildID'] for key in required_discord: if not config.get('Discord', key, fallback=None): errors.append(f"Missing required Discord config value: {key}") # Validate AllowedGuildID is a valid integer try: guild_id = config.getint('Discord', 'AllowedGuildID', fallback=0) if guild_id <= 0: errors.append("AllowedGuildID must be a positive integer") except ValueError: errors.append("AllowedGuildID must be a valid integer") # Validate API keys have correct prefixes client_key = config.get('Pterodactyl', 'ClientAPIKey', fallback='') if client_key and not client_key.startswith('ptlc_'): errors.append("ClientAPIKey should start with 'ptlc_'") app_key = config.get('Pterodactyl', 'ApplicationAPIKey', fallback='') if app_key and not app_key.startswith('ptla_'): errors.append("ApplicationAPIKey should start with 'ptla_'") # Validate PanelURL is a valid URL panel_url = config.get('Pterodactyl', 'PanelURL', fallback='') if panel_url and not (panel_url.startswith('http://') or panel_url.startswith('https://')): errors.append("PanelURL must start with http:// or https://") if errors: error_msg = "Configuration validation failed:\n- " + "\n- ".join(errors) logger.error(error_msg) raise ConfigValidationError(error_msg) logger.info("Configuration validation passed") # ============================================== # CONSTANTS (Updated with validation) # ============================================== try: validate_config() PTERODACTYL_URL = config.get('Pterodactyl', 'PanelURL') PTERODACTYL_CLIENT_API_KEY = config.get('Pterodactyl', 'ClientAPIKey') PTERODACTYL_APPLICATION_API_KEY = config.get('Pterodactyl', 'ApplicationAPIKey') DISCORD_TOKEN = config.get('Discord', 'Token') ALLOWED_GUILD_ID = config.getint('Discord', 'AllowedGuildID') REQUIRED_ROLE = "Game Server User" UPDATE_INTERVAL = 10 EMBED_LOCATIONS_FILE = "./embed/embed_locations.json" logger.debug("Loaded and validated configuration values from config.ini") except ConfigValidationError as e: logger.critical(f"Configuration error: {str(e)}") raise except Exception as e: logger.critical(f"Unexpected error loading configuration: {str(e)}") raise # ============================================== # PTERODACTYL API CLASS # ============================================== class PterodactylAPI: """ Handles all interactions with the Pterodactyl Panel API. Uses client API key for client endpoints and application API key for admin endpoints. Provides methods for server management and monitoring. """ def __init__(self, panel_url: str, client_api_key: str, application_api_key: str): """ Initialize the Pterodactyl API client with both API keys. Args: panel_url: URL of the Pterodactyl panel (must include protocol) client_api_key: API key for client endpoints (starts with ptlc_) application_api_key: API key for application endpoints (starts with ptla_) """ self.panel_url = panel_url.rstrip('/') self.client_api_key = client_api_key self.application_api_key = application_api_key self.session = None self.lock = asyncio.Lock() # Prevents concurrent API access logger.info("Initialized PterodactylAPI client with provided credentials") async def initialize(self): """Initialize the aiohttp client session for API requests.""" self.session = aiohttp.ClientSession() logger.debug("Created new aiohttp ClientSession") async def close(self): """Cleanly close the aiohttp session when shutting down.""" if self.session and not self.session.closed: await self.session.close() logger.debug("Closed aiohttp ClientSession") async def _request(self, method: str, endpoint: str, data: Optional[dict] = None, use_application_key: bool = False) -> dict: """ Make an authenticated request to the Pterodactyl API. Args: method: HTTP method (GET, POST, PUT, DELETE, etc.) endpoint: API endpoint (e.g., 'application/servers') data: Optional JSON payload for POST/PUT requests use_application_key: Whether to use the application API key (admin endpoints) Returns: Dictionary containing API response or error information Raises: aiohttp.ClientError: For network-related issues json.JSONDecodeError: If response cannot be parsed as JSON """ url = f"{self.panel_url}/api/{endpoint}" api_key_type = "Application" if use_application_key else "Client" logger.debug(f"Preparing {method} request to {endpoint} using {api_key_type} API key") # Choose the appropriate API key api_key = self.application_api_key if use_application_key else self.client_api_key headers = { "Authorization": f"Bearer {api_key}", "Accept": "application/json", "Content-Type": "application/json" } try: async with self.lock: logger.debug(f"Acquired lock for API request to {endpoint}") async with self.session.request( method, url, headers=headers, json=data ) as response: if response.status == 204: # No content logger.debug(f"Received 204 No Content response from {endpoint}") return {"status": "success"} response_data = await response.json() logger.debug(f"Received response from {endpoint} with status {response.status}") if response.status >= 400: error_msg = response_data.get('errors', [{}])[0].get('detail', 'Unknown error') logger.error(f"API request to {endpoint} failed with status {response.status}: {error_msg}") return {"status": "error", "message": error_msg} return response_data except Exception as e: logger.error(f"Exception during API request to {endpoint}: {str(e)}") return {"status": "error", "message": str(e)} async def get_servers(self) -> List[dict]: """ Get a list of all servers from the Pterodactyl panel. Uses application API key as this is an admin endpoint. Returns: List of server dictionaries containing all server attributes """ logger.info("Fetching list of all servers from Pterodactyl panel") response = await self._request("GET", "application/servers", use_application_key=True) servers = response.get('data', []) logger.info(f"Retrieved {len(servers)} servers from Pterodactyl panel") return servers async def get_server_resources(self, server_id: str) -> dict: """ Get resource usage for a specific server. Uses client API key as this is a client endpoint. Args: server_id: The Pterodactyl server identifier Returns: Dictionary containing server resource usage and current state """ logger.debug(f"Fetching resource usage for server {server_id}") try: response = await self._request("GET", f"client/servers/{server_id}/resources") if response.get('status') == 'error': error_msg = response.get('message', 'Unknown error') logger.error(f"Failed to get resources for server {server_id}: {error_msg}") return {'attributes': {'current_state': 'offline'}} state = response.get('attributes', {}).get('current_state', 'unknown') logger.debug(f"Server {server_id} current state: {state}") return response except Exception as e: logger.error(f"Exception getting resources for server {server_id}: {str(e)}") return {'attributes': {'current_state': 'offline'}} async def send_power_action(self, server_id: str, action: str) -> dict: """ Send a power action to a server (start/stop/restart). Uses client API key as this is a client endpoint. Args: server_id: The Pterodactyl server identifier action: Power action to send (start/stop/restart) Returns: Dictionary containing API response status """ valid_actions = ['start', 'stop', 'restart'] if action not in valid_actions: logger.warning(f"Invalid power action attempted: {action}") return {"status": "error", "message": f"Invalid action. Must be one of: {', '.join(valid_actions)}"} logger.info(f"Sending {action} command to server {server_id}") result = await self._request("POST", f"client/servers/{server_id}/power", {"signal": action}) if result.get('status') == 'success': logger.info(f"Successfully executed {action} on server {server_id}") else: logger.error(f"Failed to execute {action} on server {server_id}: {result.get('message', 'Unknown error')}") return result async def get_server_details(self, server_id: str) -> dict: """ Get detailed server information including allocations. Uses application API key as this is an admin endpoint. Args: server_id: The Pterodactyl server identifier Returns: Dictionary containing detailed server information """ logger.debug(f"Fetching detailed information for server {server_id}") return await self._request("GET", f"application/servers/{server_id}", use_application_key=True) async def get_server_allocations(self, server_id: str) -> dict: """ Get allocation information for a server (IP addresses and ports). Uses application API key as this is an admin endpoint. Args: server_id: The Pterodactyl server identifier Returns: Dictionary containing server allocation information """ logger.debug(f"Fetching allocation information for server {server_id}") return await self._request("GET", f"application/servers/{server_id}/allocations", use_application_key=True) # ============================================== # SERVER STATUS VIEW CLASS (Buttons and UI) # ============================================== class ServerStatusView(discord.ui.View): """ Interactive Discord view containing server control buttons. Provides persistent controls for server management with role-based access. """ def __init__(self, server_id: str, server_name: str, pterodactyl_api: PterodactylAPI, server_data: dict): """ Initialize the server status view with control buttons. Args: server_id: The server's Pterodactyl identifier server_name: Human-readable server name pterodactyl_api: API client instance server_data: Full server data from Pterodactyl """ super().__init__(timeout=None) # Persistent view self.server_id = server_id self.server_name = server_name self.api = pterodactyl_api self.server_data = server_data logger.debug(f"Created ServerStatusView for {server_name} ({server_id})") async def interaction_check(self, interaction: discord.Interaction) -> bool: """ Verify the interacting user has the required role and is in the allowed guild. Args: interaction: Discord interaction object Returns: bool: True if authorized, False otherwise """ # First check if interaction is from the allowed guild if interaction.guild_id != ALLOWED_GUILD_ID: logger.warning(f"Unauthorized interaction attempt from guild {interaction.guild_id}") await interaction.response.send_message( "This bot is only available in a specific server.", ephemeral=True ) return False # Then check for required role logger.debug(f"Checking permissions for {interaction.user.name} on server {self.server_name}") has_role = any(role.name == REQUIRED_ROLE for role in interaction.user.roles) if not has_role: logger.warning(f"Permission denied for {interaction.user.name} - missing '{REQUIRED_ROLE}' role") await interaction.response.send_message( f"You don't have permission to control servers. You need the '{REQUIRED_ROLE}' role.", ephemeral=True ) return False logger.debug(f"Permission granted for {interaction.user.name}") return True async def on_error(self, interaction: discord.Interaction, error: Exception, item: discord.ui.Item): """ Handle errors in button interactions. Args: interaction: Discord interaction object error: Exception that occurred item: The UI item that triggered the error """ logger.error(f"View error in {self.server_name} by {interaction.user.name}: {str(error)}") await interaction.response.send_message( "An error occurred while processing your request.", ephemeral=True ) @discord.ui.button(label="Start", style=discord.ButtonStyle.green, custom_id="start_button") async def start_button(self, interaction: discord.Interaction, button: discord.ui.Button): """Send a start command to the server.""" logger.info(f"Start button pressed for {self.server_name} by {interaction.user.name}") await interaction.response.defer(ephemeral=True) result = await self.api.send_power_action(self.server_id, "start") if result.get('status') == 'success': message = f"Server '{self.server_name}' is starting..." logger.info(f"Successfully started server {self.server_name}") else: message = f"Failed to start server: {result.get('message', 'Unknown error')}" logger.error(f"Failed to start server {self.server_name}: {message}") await interaction.followup.send(message, ephemeral=True) @discord.ui.button(label="Stop", style=discord.ButtonStyle.red, custom_id="stop_button") async def stop_button(self, interaction: discord.Interaction, button: discord.ui.Button): """Send a stop command to the server.""" logger.info(f"Stop button pressed for {self.server_name} by {interaction.user.name}") await interaction.response.defer(ephemeral=True) result = await self.api.send_power_action(self.server_id, "stop") if result.get('status') == 'success': message = f"Server '{self.server_name}' is stopping..." logger.info(f"Successfully stopped server {self.server_name}") else: message = f"Failed to stop server: {result.get('message', 'Unknown error')}" logger.error(f"Failed to stop server {self.server_name}: {message}") await interaction.followup.send(message, ephemeral=True) @discord.ui.button(label="Restart", style=discord.ButtonStyle.blurple, custom_id="restart_button") async def restart_button(self, interaction: discord.Interaction, button: discord.ui.Button): """Send a restart command to the server.""" logger.info(f"Restart button pressed for {self.server_name} by {interaction.user.name}") await interaction.response.defer(ephemeral=True) result = await self.api.send_power_action(self.server_id, "restart") if result.get('status') == 'success': message = f"Server '{self.server_name}' is restarting..." logger.info(f"Successfully restarted server {self.server_name}") else: message = f"Failed to restart server: {result.get('message', 'Unknown error')}" logger.error(f"Failed to restart server {self.server_name}: {message}") await interaction.followup.send(message, ephemeral=True) @discord.ui.button(label="Show Address", style=discord.ButtonStyle.grey, custom_id="show_address_button") async def show_address_button(self, interaction: discord.Interaction, button: discord.ui.Button): """Show server's default allocation IP and port using client API.""" logger.info(f"Show Address button pressed for {self.server_name} by {interaction.user.name}") try: await interaction.response.defer(ephemeral=True) logger.debug(f"Fetching server details for {self.server_id}") # Get server details using client API server_details = await self.api._request( "GET", f"client/servers/{self.server_id}", use_application_key=False ) if server_details.get('status') == 'error': error_msg = server_details.get('message', 'Unknown error') logger.error(f"Failed to get server details for {self.server_id}: {error_msg}") raise ValueError(error_msg) attributes = server_details.get('attributes', {}) relationships = attributes.get('relationships', {}) allocations = relationships.get('allocations', {}).get('data', []) if not allocations: logger.warning(f"No allocations found for server {self.server_id}") raise ValueError("No allocations found for this server") # Find the default allocation (is_default=True) default_allocation = next( (alloc for alloc in allocations if alloc.get('attributes', {}).get('is_default', False)), allocations[0] # Fallback to first allocation if no default found ) allocation_attrs = default_allocation.get('attributes', {}) ip_alias = allocation_attrs.get('ip_alias', 'Unknown') port = str(allocation_attrs.get('port', 'Unknown')) logger.debug(f"Retrieved connection info for {self.server_id}: {ip_alias}:{port}") # Create and send embed embed = discord.Embed( title=f"{self.server_name} Connection Info", color=discord.Color.blue(), description=f"Server ID: `{self.server_id}`" ) embed.add_field(name="Address", value=f"`{ip_alias}`", inline=True) embed.add_field(name="Port", value=f"`{port}`", inline=True) await interaction.followup.send(embed=embed, ephemeral=True) logger.info(f"Displayed connection info for {self.server_name}") except Exception as e: logger.error(f"Failed to show address for {self.server_name}: {str(e)}") await interaction.followup.send( "⚠️ Failed to get connection info. The server may not have any ports allocated.", ephemeral=True ) # ============================================== # MAIN BOT CLASS # ============================================== class PterodactylBot(commands.Bot): """ Main bot class for Pterodactyl server management. Handles Discord interactions, embed management, and background tasks. Manages server status embeds and user commands. """ def __init__(self, *args, **kwargs): """ Initialize the Pterodactyl bot instance. Sets up caches, locks, and storage paths. """ super().__init__(*args, **kwargs) self.pterodactyl_api = None # Pterodactyl API client self.server_cache: Dict[str, dict] = {} # Cache of server data from Pterodactyl self.embed_locations: Dict[str, Dict[str, int]] = {} # Tracks where embeds are posted self.update_lock = asyncio.Lock() # Prevents concurrent updates self.embed_storage_path = Path(EMBED_LOCATIONS_FILE) # File to store embed locations # Track previous server states and CPU usage to detect changes # Format: {server_id: (state, cpu_usage, last_force_update)} self.previous_states: Dict[str, Tuple[str, float, Optional[float]]] = {} logger.info("Initialized PterodactylBot instance with state tracking") async def setup_hook(self): """ Bot setup routine called when the bot is starting. Initializes API client, loads saved data, and starts background tasks. """ logger.info("Running bot setup hook") # Initialize API client self.pterodactyl_api = PterodactylAPI( PTERODACTYL_URL, PTERODACTYL_CLIENT_API_KEY, PTERODACTYL_APPLICATION_API_KEY ) await self.pterodactyl_api.initialize() logger.info("Initialized Pterodactyl API client") # Load saved embed locations await self.load_embed_locations() # Start background update task self.update_status.start() logger.info("Started background status update task") async def load_embed_locations(self): """Load saved embed locations from JSON storage file.""" logger.debug("Attempting to load embed locations from storage") if not self.embed_storage_path.exists(): logger.info("No existing embed locations file found") return try: with open(self.embed_storage_path, 'r') as f: self.embed_locations = json.load(f) logger.info(f"Loaded {len(self.embed_locations)} embed locations from storage") except Exception as e: logger.error(f"Failed to load embed locations: {str(e)}") async def save_embed_locations(self): """Save current embed locations to JSON storage file.""" logger.debug("Attempting to save embed locations to storage") try: with open(self.embed_storage_path, 'w') as f: json.dump(self.embed_locations, f, indent=2) logger.debug("Successfully saved embed locations to disk") except Exception as e: logger.error(f"Failed to save embed locations: {str(e)}") async def refresh_all_embeds(self) -> Tuple[int, int]: """ Perform a complete refresh of all server status embeds. Creates new embeds and deletes old ones to prevent duplication. Returns: Tuple of (deleted_count, created_count) - number of embeds processed """ logger.info("Starting full refresh of all server embeds") async with self.update_lock: try: await asyncio.sleep(1) # Initial delay # Get current server list if cache is empty if not self.server_cache: logger.debug("Server cache empty, fetching fresh server list") servers = await self.pterodactyl_api.get_servers() if not servers: logger.warning("No servers found in Pterodactyl panel") return 0, 0 self.server_cache = {server['attributes']['identifier']: server for server in servers} logger.info(f"Populated server cache with {len(servers)} servers") # Create new embeds in temporary storage new_embeds = {} created_count = 0 skipped_count = 0 for server_id, server_data in self.server_cache.items(): # Skip if we don't have an existing location to recreate in if server_id not in self.embed_locations: skipped_count += 1 continue channel_id = self.embed_locations[server_id]['channel_id'] channel = self.get_channel(int(channel_id)) if not channel: logger.warning(f"Channel {channel_id} not found for server {server_id}") continue try: logger.debug(f"Creating new embed for server {server_id}") # Get current server status resources = await self.pterodactyl_api.get_server_resources(server_id) # Create new embed embed, view = await self.get_server_status_embed(server_data, resources) message = await channel.send(embed=embed, view=view) # Store in temporary location new_embeds[server_id] = { 'channel_id': str(channel.id), 'message_id': str(message.id) } created_count += 1 logger.info(f"Created new embed for server {server_data['attributes']['name']}") await asyncio.sleep(1) # Rate limit protection except Exception as e: logger.error(f"Failed to create new embed for server {server_id}: {str(e)}") logger.info(f"Created {created_count} new embeds, skipped {skipped_count} servers") # Only proceed if we created at least one new embed if not new_embeds: logger.warning("No new embeds created during refresh") return 0, 0 # Now delete old embeds deleted_count = 0 not_found_count = 0 for server_id, location in list(self.embed_locations.items()): try: channel = self.get_channel(int(location['channel_id'])) if channel: try: message = await channel.fetch_message(int(location['message_id'])) await message.delete() deleted_count += 1 logger.debug(f"Deleted old embed for server {server_id}") await asyncio.sleep(0.5) # Rate limit protection except discord.NotFound: not_found_count += 1 logger.debug(f"Old embed for server {server_id} already deleted") except Exception as e: logger.error(f"Failed to delete old embed for server {server_id}: {str(e)}") except Exception as e: logger.error(f"Error processing old embed for server {server_id}: {str(e)}") logger.info(f"Deleted {deleted_count} old embeds, {not_found_count} already missing") # Update storage with new embed locations self.embed_locations = new_embeds await self.save_embed_locations() return deleted_count, created_count except Exception as e: logger.error(f"Critical error during embed refresh: {str(e)}") raise async def track_new_embed(self, server_id: str, message: discord.Message): """ Track a newly created embed in storage. Args: server_id: The server's Pterodactyl identifier message: Discord message containing the embed """ logger.debug(f"Tracking new embed for server {server_id} in channel {message.channel.id}") self.embed_locations[server_id] = { 'channel_id': str(message.channel.id), 'message_id': str(message.id) } await self.save_embed_locations() async def get_server_status_embed(self, server_data: dict, resources: dict) -> Tuple[discord.Embed, ServerStatusView]: """ Create a status embed and view for a server. Args: server_data: Server information from Pterodactyl resources: Current resource usage data Returns: Tuple of (embed, view) objects ready for display """ attributes = server_data.get('attributes', {}) identifier = attributes.get('identifier', 'unknown') name = attributes.get('name', 'Unknown Server') description = attributes.get('description', 'No description available') logger.debug(f"Building status embed for server {name} ({identifier})") # Parse resource data resource_attributes = resources.get('attributes', {}) current_state = resource_attributes.get('current_state', 'offline').title() is_suspended = attributes.get('suspended', False) # Create embed with appropriate color based on status embed = discord.Embed( title=f"{name} - {current_state}", description=description, color=discord.Color.blue() if current_state.lower() == "running" else discord.Color.red(), timestamp=datetime.now() ) embed.add_field(name="Server ID", value=identifier, inline=True) if is_suspended: embed.add_field(name="Status", value="⛔ Suspended", inline=True) else: embed.add_field(name="Status", value="✅ Active", inline=True) # Add resource usage if server is running if current_state.lower() == "running": cpu_usage = round(resource_attributes.get('resources', {}).get('cpu_absolute', 0), 2) memory_usage = round(resource_attributes.get('resources', {}).get('memory_bytes', 0) / (1024 ** 2), 2) disk_usage = round(resource_attributes.get('resources', {}).get('disk_bytes', 0) / (1024 ** 2), 2) network_rx = round(resource_attributes.get('resources', {}).get('network_rx_bytes', 0) / (1024 ** 2), 2) network_tx = round(resource_attributes.get('resources', {}).get('network_tx_bytes', 0) / (1024 ** 2), 2) # Get uptime from Pterodactyl API (in milliseconds) uptime_ms = resource_attributes.get('resources', {}).get('uptime', 0) # Format uptime for display if uptime_ms > 0: uptime_seconds = uptime_ms // 1000 # Convert ms to seconds if uptime_seconds < 60: uptime_text = f"{uptime_seconds}s" elif uptime_seconds < 3600: uptime_text = f"{uptime_seconds // 60}m {uptime_seconds % 60}s" elif uptime_seconds < 86400: hours = uptime_seconds // 3600 minutes = (uptime_seconds % 3600) // 60 uptime_text = f"{hours}h {minutes}m" else: days = uptime_seconds // 86400 hours = (uptime_seconds % 86400) // 3600 uptime_text = f"{days}d {hours}h" else: uptime_text = "Just started" embed.add_field(name="Uptime", value=uptime_text, inline=True) embed.add_field(name="CPU Usage", value=f"{cpu_usage}%", inline=True) embed.add_field(name="Memory Usage", value=f"{memory_usage} MB", inline=True) embed.add_field(name="Disk Usage", value=f"{disk_usage} MB", inline=True) embed.add_field(name="Network", value=f"⬇️ {network_rx} MB / ⬆️ {network_tx} MB", inline=False) embed.set_footer(text="Last updated") # Create interactive view with control buttons view = ServerStatusView( server_id=identifier, server_name=name, pterodactyl_api=self.pterodactyl_api, server_data=server_data ) logger.debug(f"Successfully built status components for {name}") return embed, view @tasks.loop(seconds=UPDATE_INTERVAL) async def update_status(self): """ Background task to update server status embeds when: 1. Server power state changes (started/stopped/restarted) 2. Significant CPU usage change (>50% difference) 3. First time seeing the server 4. Server has been running for 10 minutes (force update for uptime) This minimizes API calls to Discord and updates while maintaining real-time awareness of important server changes. """ logger.info("Starting optimized server status update cycle") async with self.update_lock: # Ensure only one update runs at a time try: # Fetch current server list from Pterodactyl servers = await self.pterodactyl_api.get_servers() if not servers: logger.warning("No servers found in Pterodactyl panel during update") return # Update our local cache with fresh server data self.server_cache = {server['attributes']['identifier']: server for server in servers} logger.debug(f"Updated server cache with {len(servers)} servers") # Variables to track our update statistics update_count = 0 # Successful updates error_count = 0 # Failed updates missing_count = 0 # Missing embeds skipped_count = 0 # Servers that didn't need updates current_time = datetime.now().timestamp() # Process each server we're tracking embeds for for server_id, location in list(self.embed_locations.items()): # Skip if server no longer exists in Pterodactyl if server_id not in self.server_cache: logger.warning(f"Server {server_id} not found in cache, skipping update") continue server_data = self.server_cache[server_id] server_name = server_data['attributes']['name'] try: logger.debug(f"Checking status for server {server_name} ({server_id})") # Get current server resource usage resources = await self.pterodactyl_api.get_server_resources(server_id) current_state = resources.get('attributes', {}).get('current_state', 'offline') cpu_usage = round(resources.get('attributes', {}).get('resources', {}).get('cpu_absolute', 0), 2) # Retrieve previous recorded state, CPU usage, and last force update time prev_state, prev_cpu, last_force_update = self.previous_states.get(server_id, (None, 0, None)) # DECISION LOGIC: Should we update the embed? needs_update = False # 1. Check if power state changed (most important) if current_state != prev_state: logger.debug(f"Power state changed for {server_name}: {prev_state} -> {current_state}") needs_update = True # 2. Check for significant CPU change (only if server is running) elif current_state == 'running' and abs(cpu_usage - prev_cpu) > 50: logger.debug(f"Significant CPU change for {server_name}: {prev_cpu}% -> {cpu_usage}%") needs_update = True # 3. First time we're seeing this server (initial update) elif prev_state is None: logger.debug(f"First check for {server_name}, performing initial update") needs_update = True # 4. Force update every 10 minutes for running servers (for uptime counter) elif (current_state == 'running' and (last_force_update is None or current_time - last_force_update >= 600)): # 10 minutes = 600 seconds logger.debug(f"Executing 10-minute force update for running server {server_name}") needs_update = True # Update the last force update time last_force_update = current_time # PERFORM UPDATE IF NEEDED if needs_update: # Generate fresh embed and view components embed, view = await self.get_server_status_embed(server_data, resources) # Get the channel where this server's embed lives channel = self.get_channel(int(location['channel_id'])) if not channel: logger.warning(f"Channel {location['channel_id']} not found for server {server_id}") continue # Fetch and update the existing message message = await channel.fetch_message(int(location['message_id'])) await message.edit(embed=embed, view=view) update_count += 1 logger.debug(f"Updated status for {server_name}") # Update our state tracking with new values # Only update last_force_update if this was a force update new_last_force_update = last_force_update if needs_update and current_state == 'running' and current_time - (last_force_update or 0) >= 600 else (last_force_update if last_force_update is not None else None) self.previous_states[server_id] = (current_state, cpu_usage, new_last_force_update) else: # No significant changes detected, but update tracking with current state self.previous_states[server_id] = (current_state, cpu_usage, last_force_update) skipped_count += 1 logger.debug(f"No changes detected for {server_name}, skipping update") except discord.NotFound: # Embed message was deleted - clean up our tracking logger.warning(f"Embed for server {server_id} not found, removing from tracking") self.embed_locations.pop(server_id, None) self.previous_states.pop(server_id, None) # Also clean up state tracking missing_count += 1 await self.save_embed_locations() except Exception as e: logger.error(f"Failed to update status for server {server_id}: {str(e)}") error_count += 1 # Small delay between servers to avoid rate limits await asyncio.sleep(0.5) # Log summary of this update cycle logger.info( f"Update cycle complete: " f"{update_count} updated, " f"{skipped_count} skipped, " f"{missing_count} missing, " f"{error_count} errors" ) except Exception as e: logger.error(f"Error in update_status task: {str(e)}") # If something went wrong, wait before retrying await asyncio.sleep(5) @update_status.before_loop async def before_update_status(self): """Wait for bot to be ready before starting update task.""" logger.debug("Waiting for bot readiness before starting update task") await self.wait_until_ready() await self.refresh_all_embeds() @update_status.after_loop async def after_update_status(self): """Handle update task stopping.""" if self.update_status.is_being_cancelled(): logger.info("Server status update task was cancelled") elif self.update_status.failed(): logger.error("Server status update task failed") async def close(self): """Cleanup when bot is shutting down.""" logger.info("Bot shutdown initiated - performing cleanup") await self.save_embed_locations() if self.pterodactyl_api: await self.pterodactyl_api.close() await super().close() # ============================================== # DISCORD COMMANDS # ============================================== intents = discord.Intents.default() intents.message_content = True bot = PterodactylBot(command_prefix="!", intents=intents) async def check_allowed_guild(interaction: discord.Interaction) -> bool: """ Verify that an interaction is coming from the allowed guild. Args: interaction: Discord interaction object Returns: bool: True if interaction is allowed, False otherwise """ if interaction.guild_id != ALLOWED_GUILD_ID: logger.warning(f"Command attempted from unauthorized guild {interaction.guild_id} by {interaction.user.name}") await interaction.response.send_message( "This bot is only available in a specific server.", ephemeral=True ) return False return True @bot.tree.command(name="server_status", description="Get a list of available game servers to control") async def server_status(interaction: discord.Interaction): """ Slash command to display server status dashboard with interactive dropdown selection. This command provides a comprehensive server management interface by: 1. Fetching current server list from Pterodactyl panel 2. Generating real-time statistics (online/offline counts) 3. Displaying an informational embed with server statistics 4. Presenting an ephemeral dropdown menu with all available servers 5. Handling server selection to create permanent status embeds in the channel Workflow: - Validates guild permissions and defers ephemeral response - Refreshes server cache from Pterodactyl API - Calculates online/offline statistics by checking each server's state - Creates statistics embed with visual server status breakdown - Generates dropdown menu with all servers (name + description) - Handles user selection via ephemeral dropdown interaction - Creates permanent status embed in channel upon selection - Manages embed tracking and cleanup of previous embeds Ephemeral Design: - Initial dashboard and dropdown are ephemeral (visible only to user) - Automatically disappears after use or timeout (3 minutes) - No manual cleanup required for dropdown interface - Only final server status embed is posted publicly Error Handling: - Handles API failures during server enumeration - Manages missing servers between selection and execution - Provides user-friendly error messages for all failure scenarios - Maintains comprehensive logging for troubleshooting Args: interaction: Discord interaction object representing the command invocation Returns: None: Sends ephemeral dashboard with dropdown, then public status embed on selection """ # Check if interaction is from allowed guild if not await check_allowed_guild(interaction): return logger.info(f"Server status command invoked by {interaction.user.name}") await interaction.response.defer(ephemeral=True) try: # Refresh server cache with current data from Pterodactyl panel servers = await bot.pterodactyl_api.get_servers() if not servers: logger.warning("No servers found in Pterodactyl panel") await interaction.followup.send("No servers found in the Pterodactyl panel.", ephemeral=True) return bot.server_cache = {server['attributes']['identifier']: server for server in servers} logger.debug(f"Refreshed server cache with {len(servers)} servers") # Count online/offline servers by checking each server's current state online_count = 0 offline_count = 0 # Check status for each server to generate accurate statistics for server_id, server_data in bot.server_cache.items(): resources = await bot.pterodactyl_api.get_server_resources(server_id) current_state = resources.get('attributes', {}).get('current_state', 'offline') if current_state == 'running': online_count += 1 else: offline_count += 1 # Create statistics embed with visual server status breakdown stats_embed = discord.Embed( title="🏗️ Server Status Dashboard", description="Select a server from the dropdown below to view its detailed status and controls.", color=discord.Color.blue(), timestamp=datetime.now() ) stats_embed.add_field( name="📊 Server Statistics", value=f"**Total Servers:** {len(servers)}\n" f"✅ **Online:** {online_count}\n" f"❌ **Offline:** {offline_count}", inline=False ) stats_embed.add_field( name="ℹ️ How to Use", value="Use the dropdown menu below to select a server. The server's status embed will be posted in this channel.", inline=False ) stats_embed.set_footer(text="Server status will update automatically") # Create dropdown menu options from available servers server_options = [] for server_id, server_data in bot.server_cache.items(): server_name = server_data['attributes']['name'] server_description = server_data['attributes'].get('description', 'No description') # Truncate description if too long for dropdown constraints if len(server_description) > 50: server_description = server_description[:47] + "..." server_options.append( discord.SelectOption( label=server_name, value=server_id, description=server_description ) ) # Create dropdown view with timeout for automatic cleanup class ServerDropdownView(discord.ui.View): def __init__(self, server_options, timeout=180): # 3 minute timeout for ephemeral cleanup super().__init__(timeout=timeout) self.server_options = server_options self.add_item(ServerDropdown(server_options)) async def on_timeout(self): # Clean up when dropdown times out (ephemeral auto-removal) logger.debug("Server dropdown timed out and was automatically cleaned up") # Dropdown selection handler for server choice class ServerDropdown(discord.ui.Select): def __init__(self, server_options): super().__init__( placeholder="Select a server to display...", options=server_options, min_values=1, max_values=1 ) async def callback(self, interaction: discord.Interaction): """ Handle server selection from dropdown menu. Creates permanent status embed in the channel for the selected server. """ await interaction.response.defer(ephemeral=True) selected_server_id = self.values[0] server_data = bot.server_cache.get(selected_server_id) if not server_data: await interaction.followup.send( "❌ Selected server no longer available. Please try again.", ephemeral=True ) return server_name = server_data['attributes']['name'] logger.info(f"User {interaction.user.name} selected server: {server_name}") try: # Get current server status for embed creation resources = await bot.pterodactyl_api.get_server_resources(selected_server_id) # Delete old embed if it exists to prevent duplication if selected_server_id in bot.embed_locations: logger.debug(f"Found existing embed for {selected_server_id}, attempting to delete") try: old_location = bot.embed_locations[selected_server_id] old_channel = bot.get_channel(int(old_location['channel_id'])) if old_channel: try: old_message = await old_channel.fetch_message(int(old_location['message_id'])) await old_message.delete() logger.debug(f"Deleted old embed for {selected_server_id}") except discord.NotFound: logger.debug(f"Old embed for {selected_server_id} already deleted") except Exception as e: logger.error(f"Failed to delete old embed: {str(e)}") # Create and send new permanent status embed in channel embed, view = await bot.get_server_status_embed(server_data, resources) message = await interaction.channel.send(embed=embed, view=view) await bot.track_new_embed(selected_server_id, message) await interaction.followup.send( f"✅ **{server_name}** status has been posted in {interaction.channel.mention}", ephemeral=True ) logger.info(f"Successfully posted status for {server_name}") except Exception as e: logger.error(f"Failed to create status embed for {server_name}: {str(e)}") await interaction.followup.send( f"❌ Failed to create status embed for **{server_name}**: {str(e)}", ephemeral=True ) # Send the initial dashboard embed with dropdown (ephemeral - auto-cleaned) await interaction.followup.send( embed=stats_embed, view=ServerDropdownView(server_options), ephemeral=True ) logger.info(f"Sent server status dashboard to {interaction.user.name} with {len(server_options)} servers") except Exception as e: logger.error(f"Server status command failed: {str(e)}") await interaction.followup.send( f"❌ Failed to load server status: {str(e)}", ephemeral=True ) @bot.tree.command(name="refresh_embeds", description="Refresh all server status embeds (admin only)") async def refresh_embeds(interaction: discord.Interaction): """Slash command to refresh all server embeds.""" if not await check_allowed_guild(interaction): return logger.info(f"Refresh embeds command invoked by {interaction.user.name}") await interaction.response.defer(ephemeral=True) # Require administrator permissions if not interaction.user.guild_permissions.administrator: logger.warning(f"Unauthorized refresh attempt by {interaction.user.name}") await interaction.followup.send( "You need administrator permissions to refresh all embeds.", ephemeral=True ) return try: logger.info("Starting full embed refresh per admin request") deleted, created = await bot.refresh_all_embeds() await interaction.followup.send( f"Refreshed all embeds. Deleted {deleted} old embeds, created {created} new ones.", ephemeral=True ) logger.info(f"Embed refresh completed: {deleted} deleted, {created} created") except Exception as e: logger.error(f"Embed refresh failed: {str(e)}") await interaction.followup.send( f"Failed to refresh embeds: {str(e)}", ephemeral=True ) @bot.tree.command(name="purge_embeds", description="Permanently delete all server status embeds (admin only)") async def purge_embeds(interaction: discord.Interaction): """ Slash command to permanently purge all server status embeds from Discord channels. This command performs a complete cleanup of all tracked server status embeds by: 1. Iterating through all tracked embed locations in embed_locations.json 2. Attempting to delete each embed message from its respective Discord channel 3. Clearing the embed tracking file and internal state tracking 4. Providing real-time progress updates during the operation Args: interaction: Discord interaction object representing the command invocation Workflow: - Validates administrator permissions - Checks if any embeds are currently tracked - Sends initial progress embed - Processes each embed sequentially with error handling - Updates progress embed in real-time - Saves cleared tracking data to disk - Sends final results with comprehensive statistics Safety: - Only affects tracked embeds (won't delete arbitrary messages) - Maintains logs for audit purposes - Provides rollback protection through immediate tracking removal - Includes rate limiting to avoid Discord API limits Returns: None: Sends follow-up messages with operation results """ if not await check_allowed_guild(interaction): return logger.info(f"Purge embeds command invoked by {interaction.user.name}") await interaction.response.defer(ephemeral=True) # Require administrator permissions if not interaction.user.guild_permissions.administrator: logger.warning(f"Unauthorized purge attempt by {interaction.user.name}") await interaction.followup.send( "You need administrator permissions to purge all embeds.", ephemeral=True ) return try: logger.info("Starting embed purge per admin request") # Variables to track purge statistics deleted_count = 0 not_found_count = 0 error_count = 0 total_embeds = len(bot.embed_locations) if total_embeds == 0: await interaction.followup.send( "No embeds are currently being tracked. Nothing to purge.", ephemeral=True ) return # Create progress embed progress_embed = discord.Embed( title="🔄 Purging Server Embeds", description=f"Processing {total_embeds} embeds...", color=discord.Color.orange() ) progress_embed.add_field(name="Deleted", value="0", inline=True) progress_embed.add_field(name="Not Found", value="0", inline=True) progress_embed.add_field(name="Errors", value="0", inline=True) progress_embed.set_footer(text="This may take a while...") progress_message = await interaction.followup.send(embed=progress_embed, ephemeral=True) # Process each tracked embed for server_id, location in list(bot.embed_locations.items()): try: channel = bot.get_channel(int(location['channel_id'])) if channel: try: message = await channel.fetch_message(int(location['message_id'])) await message.delete() deleted_count += 1 logger.debug(f"Successfully purged embed for server {server_id}") except discord.NotFound: not_found_count += 1 logger.debug(f"Embed for server {server_id} already deleted") except discord.Forbidden: error_count += 1 logger.error(f"Permission denied when deleting embed for server {server_id}") except Exception as e: error_count += 1 logger.error(f"Error deleting embed for server {server_id}: {str(e)}") else: not_found_count += 1 logger.warning(f"Channel not found for server {server_id}") # Remove from tracking immediately bot.embed_locations.pop(server_id, None) bot.previous_states.pop(server_id, None) # Also clean up state tracking # Update progress every 5 embeds or for the last one if (deleted_count + not_found_count + error_count) % 5 == 0 or \ (deleted_count + not_found_count + error_count) == total_embeds: progress_embed.description = f"Processed {deleted_count + not_found_count + error_count}/{total_embeds} embeds" progress_embed.set_field_at(0, name="Deleted", value=str(deleted_count), inline=True) progress_embed.set_field_at(1, name="Not Found", value=str(not_found_count), inline=True) progress_embed.set_field_at(2, name="Errors", value=str(error_count), inline=True) await progress_message.edit(embed=progress_embed) # Small delay to avoid rate limits await asyncio.sleep(0.3) except Exception as e: error_count += 1 logger.error(f"Unexpected error processing server {server_id}: {str(e)}") # Save the cleared embed locations await bot.save_embed_locations() # Create results embed result_embed = discord.Embed( title="✅ Embed Purge Complete", color=discord.Color.green(), timestamp=datetime.now() ) result_embed.add_field(name="Total Tracked", value=str(total_embeds), inline=True) result_embed.add_field(name="✅ Successfully Deleted", value=str(deleted_count), inline=True) result_embed.add_field(name="❌ Already Missing", value=str(not_found_count), inline=True) result_embed.add_field(name="⚠️ Errors", value=str(error_count), inline=True) result_embed.add_field(name="📊 Success Rate", value=f"{((deleted_count + not_found_count) / total_embeds * 100):.1f}%", inline=True) result_embed.set_footer(text="Embed tracking file has been cleared") await progress_message.edit(embed=result_embed) logger.info(f"Embed purge completed: {deleted_count} deleted, {not_found_count} not found, {error_count} errors") except Exception as e: logger.error(f"Embed purge failed: {str(e)}") await interaction.followup.send( f"❌ Failed to purge embeds: {str(e)}", ephemeral=True ) # ============================================== # BOT EVENTS # ============================================== @bot.event async def on_interaction(interaction: discord.Interaction): """Global interaction handler to check guild before processing any interaction.""" if interaction.guild_id != ALLOWED_GUILD_ID: logger.debug(f"Ignoring interaction from unauthorized guild {interaction.guild_id}") await interaction.response.send_message( "This bot is only available in a specific server.", ephemeral=True ) return @bot.event async def on_ready(): """Called when the bot successfully connects to Discord.""" logger.info(f"Bot connected as {bot.user.name} (ID: {bot.user.id})") try: # Sync commands only to the allowed guild guild = discord.Object(id=ALLOWED_GUILD_ID) bot.tree.copy_global_to(guild=guild) synced = await bot.tree.sync(guild=guild) logger.info(f"Successfully synced {len(synced)} command(s) to guild {ALLOWED_GUILD_ID}: {[cmd.name for cmd in synced]}") except Exception as e: logger.error(f"Command sync failed: {str(e)}") # ============================================== # SYSTEM SIGNAL HANDLERS # ============================================== def handle_sigint(signum: int, frame: types.FrameType) -> None: """ Handle SIGINT signals (Ctrl+C) by initiating graceful shutdown. Args: signum: The signal number (signal.SIGINT) frame: Current stack frame (unused but required by signal handler signature) """ logger.info("Received SIGINT (Ctrl+C), initiating graceful shutdown...") raise KeyboardInterrupt def handle_sigterm(signum: int, frame: types.FrameType) -> None: """ Handle SIGTERM signals (container stop) by initiating graceful shutdown. Args: signum: The signal number (signal.SIGTERM) frame: Current stack frame (unused but required by signal handler signature) """ logger.info("Received SIGTERM (container stop), initiating graceful shutdown...") raise KeyboardInterrupt # ============================================== # BOT STARTUP # ============================================== if __name__ == "__main__": """ Main entry point for the bot application. Handles: - Signal registration for graceful shutdowns (SIGINT/SIGTERM) - Primary bot execution loop - Error handling and crash reporting - Resource cleanup on shutdown Flow: 1. Initialize signal handlers 2. Start bot with Discord token 3. Handle interrupts and exceptions 4. Execute final cleanup """ logger.info("Starting bot initialization") # Register signal handlers signal.signal(signal.SIGINT, handle_sigint) # For Ctrl+C signal.signal(signal.SIGTERM, handle_sigterm) # For container stop commands logger.info("System signal handlers registered") try: bot.run(DISCORD_TOKEN) except KeyboardInterrupt: logger.info("Bot shutting down") except Exception as e: logger.error(f"Bot crashed: {str(e)}") sys.exit(1) # Exit with error code for crash finally: logger.info("Bot shutdown complete") sys.exit(0) # Explicit clean exit