""" Pterodactyl Discord Bot A comprehensive Discord bot for managing Pterodactyl game servers through Discord. Features: - Server status monitoring with auto-updating embeds - Power controls (start/stop/restart) - Server address/port display - Multi-channel embed support - Automatic embed refresh system - Role-based access control - Single-guild restriction - Extensive logging for all operations """ import discord from discord.ext import commands, tasks from discord import app_commands import os import sys import signal import types import aiohttp import asyncio import json import traceback import logging from logging.handlers import RotatingFileHandler import configparser from datetime import datetime from typing import Dict, List, Optional, Tuple from pathlib import Path import generate_config # ============================================== # LOGGING SETUP # ============================================== logs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs') os.makedirs(logs_dir, exist_ok=True) logger = logging.getLogger('pterodisbot') logger.setLevel(logging.DEBUG) # File handler for logs (rotates when reaching 5MB, keeps 3 backups) handler = RotatingFileHandler( filename=os.path.join(logs_dir, 'pterodisbot.log'), maxBytes=5*1024*1024, # 5 MiB max log file size backupCount=3, # Rotate through 3 files encoding='utf-8' ) handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(handler) # Console handler for real-time output console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(console_handler) logger.info("Initialized logging system with file and console output") # ============================================== # CONFIGURATION SETUP # ============================================== # generate_config.generate_config() # logger.debug("Gennerated config.ini file using values from .env") config = configparser.ConfigParser() config.read('config.ini') # ============================================== # CONFIGURATION VALIDATION # ============================================== class ConfigValidationError(Exception): """Custom exception for configuration validation errors.""" pass def validate_config(): """ Validate all required configuration values at startup. Raises ConfigValidationError if any required values are missing or invalid. """ errors = [] # Validate Pterodactyl section if not config.has_section('Pterodactyl'): errors.append("Missing [Pterodactyl] section in config.ini") else: required_ptero = ['PanelURL', 'ClientAPIKey', 'ApplicationAPIKey'] for key in required_ptero: if not config.get('Pterodactyl', key, fallback=None): errors.append(f"Missing required Pterodactyl config value: {key}") # Validate Discord section if not config.has_section('Discord'): errors.append("Missing [Discord] section in config.ini") else: required_discord = ['Token', 'AllowedGuildID'] for key in required_discord: if not config.get('Discord', key, fallback=None): errors.append(f"Missing required Discord config value: {key}") # Validate AllowedGuildID is a valid integer try: guild_id = config.getint('Discord', 'AllowedGuildID', fallback=0) if guild_id <= 0: errors.append("AllowedGuildID must be a positive integer") except ValueError: errors.append("AllowedGuildID must be a valid integer") # Validate API keys have correct prefixes client_key = config.get('Pterodactyl', 'ClientAPIKey', fallback='') if client_key and not client_key.startswith('ptlc_'): errors.append("ClientAPIKey should start with 'ptlc_'") app_key = config.get('Pterodactyl', 'ApplicationAPIKey', fallback='') if app_key and not app_key.startswith('ptla_'): errors.append("ApplicationAPIKey should start with 'ptla_'") # Validate PanelURL is a valid URL panel_url = config.get('Pterodactyl', 'PanelURL', fallback='') if panel_url and not (panel_url.startswith('http://') or panel_url.startswith('https://')): errors.append("PanelURL must start with http:// or https://") if errors: error_msg = "Configuration validation failed:\n- " + "\n- ".join(errors) logger.error(error_msg) raise ConfigValidationError(error_msg) logger.info("Configuration validation passed") # ============================================== # CONSTANTS (Updated with validation) # ============================================== try: validate_config() PTERODACTYL_URL = config.get('Pterodactyl', 'PanelURL') PTERODACTYL_CLIENT_API_KEY = config.get('Pterodactyl', 'ClientAPIKey') PTERODACTYL_APPLICATION_API_KEY = config.get('Pterodactyl', 'ApplicationAPIKey') DISCORD_TOKEN = config.get('Discord', 'Token') ALLOWED_GUILD_ID = config.getint('Discord', 'AllowedGuildID') REQUIRED_ROLE = "Game Server User" UPDATE_INTERVAL = 10 EMBED_LOCATIONS_FILE = "./embed/embed_locations.json" logger.debug("Loaded and validated configuration values from config.ini") except ConfigValidationError as e: logger.critical(f"Configuration error: {str(e)}") raise except Exception as e: logger.critical(f"Unexpected error loading configuration: {str(e)}") raise # ============================================== # PTERODACTYL API CLASS # ============================================== class PterodactylAPI: """ Handles all interactions with the Pterodactyl Panel API. Uses client API key for client endpoints and application API key for admin endpoints. Provides methods for server management and monitoring. """ def __init__(self, panel_url: str, client_api_key: str, application_api_key: str): """ Initialize the Pterodactyl API client with both API keys. Args: panel_url: URL of the Pterodactyl panel (must include protocol) client_api_key: API key for client endpoints (starts with ptlc_) application_api_key: API key for application endpoints (starts with ptla_) """ self.panel_url = panel_url.rstrip('/') self.client_api_key = client_api_key self.application_api_key = application_api_key self.session = None self.lock = asyncio.Lock() # Prevents concurrent API access logger.info("Initialized PterodactylAPI client with provided credentials") async def initialize(self): """Initialize the aiohttp client session for API requests.""" self.session = aiohttp.ClientSession() logger.debug("Created new aiohttp ClientSession") async def close(self): """Cleanly close the aiohttp session when shutting down.""" if self.session and not self.session.closed: await self.session.close() logger.debug("Closed aiohttp ClientSession") async def _request(self, method: str, endpoint: str, data: Optional[dict] = None, use_application_key: bool = False) -> dict: """ Make an authenticated request to the Pterodactyl API. Args: method: HTTP method (GET, POST, PUT, DELETE, etc.) endpoint: API endpoint (e.g., 'application/servers') data: Optional JSON payload for POST/PUT requests use_application_key: Whether to use the application API key (admin endpoints) Returns: Dictionary containing API response or error information Raises: aiohttp.ClientError: For network-related issues json.JSONDecodeError: If response cannot be parsed as JSON """ url = f"{self.panel_url}/api/{endpoint}" api_key_type = "Application" if use_application_key else "Client" logger.debug(f"Preparing {method} request to {endpoint} using {api_key_type} API key") # Choose the appropriate API key api_key = self.application_api_key if use_application_key else self.client_api_key headers = { "Authorization": f"Bearer {api_key}", "Accept": "application/json", "Content-Type": "application/json" } try: async with self.lock: logger.debug(f"Acquired lock for API request to {endpoint}") async with self.session.request( method, url, headers=headers, json=data ) as response: if response.status == 204: # No content logger.debug(f"Received 204 No Content response from {endpoint}") return {"status": "success"} response_data = await response.json() logger.debug(f"Received response from {endpoint} with status {response.status}") if response.status >= 400: error_msg = response_data.get('errors', [{}])[0].get('detail', 'Unknown error') logger.error(f"API request to {endpoint} failed with status {response.status}: {error_msg}") return {"status": "error", "message": error_msg} return response_data except Exception as e: logger.error(f"Exception during API request to {endpoint}: {str(e)}") return {"status": "error", "message": str(e)} async def get_servers(self) -> List[dict]: """ Get a list of all servers from the Pterodactyl panel. Uses application API key as this is an admin endpoint. Returns: List of server dictionaries containing all server attributes """ logger.info("Fetching list of all servers from Pterodactyl panel") response = await self._request("GET", "application/servers", use_application_key=True) servers = response.get('data', []) logger.info(f"Retrieved {len(servers)} servers from Pterodactyl panel") return servers async def get_server_resources(self, server_id: str) -> dict: """ Get resource usage for a specific server. Uses client API key as this is a client endpoint. Args: server_id: The Pterodactyl server identifier Returns: Dictionary containing server resource usage and current state """ logger.debug(f"Fetching resource usage for server {server_id}") try: response = await self._request("GET", f"client/servers/{server_id}/resources") if response.get('status') == 'error': error_msg = response.get('message', 'Unknown error') logger.error(f"Failed to get resources for server {server_id}: {error_msg}") return {'attributes': {'current_state': 'offline'}} state = response.get('attributes', {}).get('current_state', 'unknown') logger.debug(f"Server {server_id} current state: {state}") return response except Exception as e: logger.error(f"Exception getting resources for server {server_id}: {str(e)}") return {'attributes': {'current_state': 'offline'}} async def send_power_action(self, server_id: str, action: str) -> dict: """ Send a power action to a server (start/stop/restart). Uses client API key as this is a client endpoint. Args: server_id: The Pterodactyl server identifier action: Power action to send (start/stop/restart) Returns: Dictionary containing API response status """ valid_actions = ['start', 'stop', 'restart'] if action not in valid_actions: logger.warning(f"Invalid power action attempted: {action}") return {"status": "error", "message": f"Invalid action. Must be one of: {', '.join(valid_actions)}"} logger.info(f"Sending {action} command to server {server_id}") result = await self._request("POST", f"client/servers/{server_id}/power", {"signal": action}) if result.get('status') == 'success': logger.info(f"Successfully executed {action} on server {server_id}") else: logger.error(f"Failed to execute {action} on server {server_id}: {result.get('message', 'Unknown error')}") return result async def get_server_details(self, server_id: str) -> dict: """ Get detailed server information including allocations. Uses application API key as this is an admin endpoint. Args: server_id: The Pterodactyl server identifier Returns: Dictionary containing detailed server information """ logger.debug(f"Fetching detailed information for server {server_id}") return await self._request("GET", f"application/servers/{server_id}", use_application_key=True) async def get_server_allocations(self, server_id: str) -> dict: """ Get allocation information for a server (IP addresses and ports). Uses application API key as this is an admin endpoint. Args: server_id: The Pterodactyl server identifier Returns: Dictionary containing server allocation information """ logger.debug(f"Fetching allocation information for server {server_id}") return await self._request("GET", f"application/servers/{server_id}/allocations", use_application_key=True) # ============================================== # SERVER STATUS VIEW CLASS (Buttons and UI) # ============================================== class ServerStatusView(discord.ui.View): """ Interactive Discord view containing server control buttons. Provides persistent controls for server management with role-based access. """ def __init__(self, server_id: str, server_name: str, pterodactyl_api: PterodactylAPI, server_data: dict): """ Initialize the server status view with control buttons. Args: server_id: The server's Pterodactyl identifier server_name: Human-readable server name pterodactyl_api: API client instance server_data: Full server data from Pterodactyl """ super().__init__(timeout=None) # Persistent view self.server_id = server_id self.server_name = server_name self.api = pterodactyl_api self.server_data = server_data logger.debug(f"Created ServerStatusView for {server_name} ({server_id})") async def interaction_check(self, interaction: discord.Interaction) -> bool: """ Verify the interacting user has the required role and is in the allowed guild. Args: interaction: Discord interaction object Returns: bool: True if authorized, False otherwise """ # First check if interaction is from the allowed guild if interaction.guild_id != ALLOWED_GUILD_ID: logger.warning(f"Unauthorized interaction attempt from guild {interaction.guild_id}") await interaction.response.send_message( "This bot is only available in a specific server.", ephemeral=True ) return False # Then check for required role logger.debug(f"Checking permissions for {interaction.user.name} on server {self.server_name}") has_role = any(role.name == REQUIRED_ROLE for role in interaction.user.roles) if not has_role: logger.warning(f"Permission denied for {interaction.user.name} - missing '{REQUIRED_ROLE}' role") await interaction.response.send_message( f"You don't have permission to control servers. You need the '{REQUIRED_ROLE}' role.", ephemeral=True ) return False logger.debug(f"Permission granted for {interaction.user.name}") return True async def on_error(self, interaction: discord.Interaction, error: Exception, item: discord.ui.Item): """ Handle errors in button interactions. Args: interaction: Discord interaction object error: Exception that occurred item: The UI item that triggered the error """ logger.error(f"View error in {self.server_name} by {interaction.user.name}: {str(error)}") await interaction.response.send_message( "An error occurred while processing your request.", ephemeral=True ) @discord.ui.button(label="Start", style=discord.ButtonStyle.green, custom_id="start_button") async def start_button(self, interaction: discord.Interaction, button: discord.ui.Button): """Send a start command to the server.""" logger.info(f"Start button pressed for {self.server_name} by {interaction.user.name}") await interaction.response.defer(ephemeral=True) result = await self.api.send_power_action(self.server_id, "start") if result.get('status') == 'success': message = f"Server '{self.server_name}' is starting..." logger.info(f"Successfully started server {self.server_name}") else: message = f"Failed to start server: {result.get('message', 'Unknown error')}" logger.error(f"Failed to start server {self.server_name}: {message}") await interaction.followup.send(message, ephemeral=True) @discord.ui.button(label="Stop", style=discord.ButtonStyle.red, custom_id="stop_button") async def stop_button(self, interaction: discord.Interaction, button: discord.ui.Button): """Send a stop command to the server.""" logger.info(f"Stop button pressed for {self.server_name} by {interaction.user.name}") await interaction.response.defer(ephemeral=True) result = await self.api.send_power_action(self.server_id, "stop") if result.get('status') == 'success': message = f"Server '{self.server_name}' is stopping..." logger.info(f"Successfully stopped server {self.server_name}") else: message = f"Failed to stop server: {result.get('message', 'Unknown error')}" logger.error(f"Failed to stop server {self.server_name}: {message}") await interaction.followup.send(message, ephemeral=True) @discord.ui.button(label="Restart", style=discord.ButtonStyle.blurple, custom_id="restart_button") async def restart_button(self, interaction: discord.Interaction, button: discord.ui.Button): """Send a restart command to the server.""" logger.info(f"Restart button pressed for {self.server_name} by {interaction.user.name}") await interaction.response.defer(ephemeral=True) result = await self.api.send_power_action(self.server_id, "restart") if result.get('status') == 'success': message = f"Server '{self.server_name}' is restarting..." logger.info(f"Successfully restarted server {self.server_name}") else: message = f"Failed to restart server: {result.get('message', 'Unknown error')}" logger.error(f"Failed to restart server {self.server_name}: {message}") await interaction.followup.send(message, ephemeral=True) @discord.ui.button(label="Show Address", style=discord.ButtonStyle.grey, custom_id="show_address_button") async def show_address_button(self, interaction: discord.Interaction, button: discord.ui.Button): """Show server's default allocation IP and port using client API.""" logger.info(f"Show Address button pressed for {self.server_name} by {interaction.user.name}") try: await interaction.response.defer(ephemeral=True) logger.debug(f"Fetching server details for {self.server_id}") # Get server details using client API server_details = await self.api._request( "GET", f"client/servers/{self.server_id}", use_application_key=False ) if server_details.get('status') == 'error': error_msg = server_details.get('message', 'Unknown error') logger.error(f"Failed to get server details for {self.server_id}: {error_msg}") raise ValueError(error_msg) attributes = server_details.get('attributes', {}) relationships = attributes.get('relationships', {}) allocations = relationships.get('allocations', {}).get('data', []) if not allocations: logger.warning(f"No allocations found for server {self.server_id}") raise ValueError("No allocations found for this server") # Find the default allocation (is_default=True) default_allocation = next( (alloc for alloc in allocations if alloc.get('attributes', {}).get('is_default', False)), allocations[0] # Fallback to first allocation if no default found ) allocation_attrs = default_allocation.get('attributes', {}) ip_alias = allocation_attrs.get('ip_alias', 'Unknown') port = str(allocation_attrs.get('port', 'Unknown')) logger.debug(f"Retrieved connection info for {self.server_id}: {ip_alias}:{port}") # Create and send embed embed = discord.Embed( title=f"{self.server_name} Connection Info", color=discord.Color.blue(), description=f"Server ID: `{self.server_id}`" ) embed.add_field(name="Address", value=f"`{ip_alias}`", inline=True) embed.add_field(name="Port", value=f"`{port}`", inline=True) await interaction.followup.send(embed=embed, ephemeral=True) logger.info(f"Displayed connection info for {self.server_name}") except Exception as e: logger.error(f"Failed to show address for {self.server_name}: {str(e)}") await interaction.followup.send( "⚠️ Failed to get connection info. The server may not have any ports allocated.", ephemeral=True ) # ============================================== # MAIN BOT CLASS # ============================================== class PterodactylBot(commands.Bot): """ Main bot class for Pterodactyl server management. Handles Discord interactions, embed management, and background tasks. Manages server status embeds and user commands. """ def __init__(self, *args, **kwargs): """ Initialize the Pterodactyl bot instance. Sets up caches, locks, and storage paths. """ super().__init__(*args, **kwargs) self.pterodactyl_api = None # Pterodactyl API client self.server_cache: Dict[str, dict] = {} # Cache of server data from Pterodactyl self.embed_locations: Dict[str, Dict[str, int]] = {} # Tracks where embeds are posted self.update_lock = asyncio.Lock() # Prevents concurrent updates self.embed_storage_path = Path(EMBED_LOCATIONS_FILE) # File to store embed locations # Track previous server states and CPU usage to detect changes # Format: {server_id: (state, cpu_usage)} self.previous_states: Dict[str, Tuple[str, float]] = {} logger.info("Initialized PterodactylBot instance with state tracking") async def setup_hook(self): """ Bot setup routine called when the bot is starting. Initializes API client, loads saved data, and starts background tasks. """ logger.info("Running bot setup hook") # Initialize API client self.pterodactyl_api = PterodactylAPI( PTERODACTYL_URL, PTERODACTYL_CLIENT_API_KEY, PTERODACTYL_APPLICATION_API_KEY ) await self.pterodactyl_api.initialize() logger.info("Initialized Pterodactyl API client") # Load saved embed locations await self.load_embed_locations() # Start background update task self.update_status.start() logger.info("Started background status update task") async def load_embed_locations(self): """Load saved embed locations from JSON storage file.""" logger.debug("Attempting to load embed locations from storage") if not self.embed_storage_path.exists(): logger.info("No existing embed locations file found") return try: with open(self.embed_storage_path, 'r') as f: self.embed_locations = json.load(f) logger.info(f"Loaded {len(self.embed_locations)} embed locations from storage") except Exception as e: logger.error(f"Failed to load embed locations: {str(e)}") async def save_embed_locations(self): """Save current embed locations to JSON storage file.""" logger.debug("Attempting to save embed locations to storage") try: with open(self.embed_storage_path, 'w') as f: json.dump(self.embed_locations, f, indent=2) logger.debug("Successfully saved embed locations to disk") except Exception as e: logger.error(f"Failed to save embed locations: {str(e)}") async def refresh_all_embeds(self) -> Tuple[int, int]: """ Perform a complete refresh of all server status embeds. Creates new embeds and deletes old ones to prevent duplication. Returns: Tuple of (deleted_count, created_count) - number of embeds processed """ logger.info("Starting full refresh of all server embeds") async with self.update_lock: try: await asyncio.sleep(1) # Initial delay # Get current server list if cache is empty if not self.server_cache: logger.debug("Server cache empty, fetching fresh server list") servers = await self.pterodactyl_api.get_servers() if not servers: logger.warning("No servers found in Pterodactyl panel") return 0, 0 self.server_cache = {server['attributes']['identifier']: server for server in servers} logger.info(f"Populated server cache with {len(servers)} servers") # Create new embeds in temporary storage new_embeds = {} created_count = 0 skipped_count = 0 for server_id, server_data in self.server_cache.items(): # Skip if we don't have an existing location to recreate in if server_id not in self.embed_locations: skipped_count += 1 continue channel_id = self.embed_locations[server_id]['channel_id'] channel = self.get_channel(int(channel_id)) if not channel: logger.warning(f"Channel {channel_id} not found for server {server_id}") continue try: logger.debug(f"Creating new embed for server {server_id}") # Get current server status resources = await self.pterodactyl_api.get_server_resources(server_id) # Create new embed embed, view = await self.get_server_status_embed(server_data, resources) message = await channel.send(embed=embed, view=view) # Store in temporary location new_embeds[server_id] = { 'channel_id': str(channel.id), 'message_id': str(message.id) } created_count += 1 logger.info(f"Created new embed for server {server_data['attributes']['name']}") await asyncio.sleep(1) # Rate limit protection except Exception as e: logger.error(f"Failed to create new embed for server {server_id}: {str(e)}") logger.info(f"Created {created_count} new embeds, skipped {skipped_count} servers") # Only proceed if we created at least one new embed if not new_embeds: logger.warning("No new embeds created during refresh") return 0, 0 # Now delete old embeds deleted_count = 0 not_found_count = 0 for server_id, location in list(self.embed_locations.items()): try: channel = self.get_channel(int(location['channel_id'])) if channel: try: message = await channel.fetch_message(int(location['message_id'])) await message.delete() deleted_count += 1 logger.debug(f"Deleted old embed for server {server_id}") await asyncio.sleep(0.5) # Rate limit protection except discord.NotFound: not_found_count += 1 logger.debug(f"Old embed for server {server_id} already deleted") except Exception as e: logger.error(f"Failed to delete old embed for server {server_id}: {str(e)}") except Exception as e: logger.error(f"Error processing old embed for server {server_id}: {str(e)}") logger.info(f"Deleted {deleted_count} old embeds, {not_found_count} already missing") # Update storage with new embed locations self.embed_locations = new_embeds await self.save_embed_locations() return deleted_count, created_count except Exception as e: logger.error(f"Critical error during embed refresh: {str(e)}") raise async def track_new_embed(self, server_id: str, message: discord.Message): """ Track a newly created embed in storage. Args: server_id: The server's Pterodactyl identifier message: Discord message containing the embed """ logger.debug(f"Tracking new embed for server {server_id} in channel {message.channel.id}") self.embed_locations[server_id] = { 'channel_id': str(message.channel.id), 'message_id': str(message.id) } await self.save_embed_locations() async def get_server_status_embed(self, server_data: dict, resources: dict) -> Tuple[discord.Embed, ServerStatusView]: """ Create a status embed and view for a server. Args: server_data: Server information from Pterodactyl resources: Current resource usage data Returns: Tuple of (embed, view) objects ready for display """ attributes = server_data.get('attributes', {}) identifier = attributes.get('identifier', 'unknown') name = attributes.get('name', 'Unknown Server') description = attributes.get('description', 'No description available') logger.debug(f"Building status embed for server {name} ({identifier})") # Parse resource data resource_attributes = resources.get('attributes', {}) current_state = resource_attributes.get('current_state', 'offline').title() is_suspended = attributes.get('suspended', False) # Create embed with appropriate color based on status embed = discord.Embed( title=f"{name} - {current_state}", description=description, color=discord.Color.blue() if current_state.lower() == "running" else discord.Color.red(), timestamp=datetime.now() ) embed.add_field(name="Server ID", value=identifier, inline=True) if is_suspended: embed.add_field(name="Status", value="⛔ Suspended", inline=True) else: embed.add_field(name="Status", value="✅ Active", inline=True) # Add resource usage if server is running if current_state.lower() == "running": cpu_usage = round(resource_attributes.get('resources', {}).get('cpu_absolute', 0), 2) memory_usage = round(resource_attributes.get('resources', {}).get('memory_bytes', 0) / (1024 ** 2), 2) disk_usage = round(resource_attributes.get('resources', {}).get('disk_bytes', 0) / (1024 ** 2), 2) network_rx = round(resource_attributes.get('resources', {}).get('network_rx_bytes', 0) / (1024 ** 2), 2) network_tx = round(resource_attributes.get('resources', {}).get('network_tx_bytes', 0) / (1024 ** 2), 2) embed.add_field(name="CPU Usage", value=f"{cpu_usage}%", inline=True) embed.add_field(name="Memory Usage", value=f"{memory_usage} MB", inline=True) embed.add_field(name="Disk Usage", value=f"{disk_usage} MB", inline=True) embed.add_field(name="Network", value=f"⬇️ {network_rx} MB / ⬆️ {network_tx} MB", inline=False) embed.set_footer(text="Last updated") # Create interactive view with control buttons view = ServerStatusView( server_id=identifier, server_name=name, pterodactyl_api=self.pterodactyl_api, server_data=server_data ) logger.debug(f"Successfully built status components for {name}") return embed, view @tasks.loop(seconds=UPDATE_INTERVAL) async def update_status(self): """ Background task to update server status embeds when: 1. Server power state changes (started/stopped/restarted) 2. Significant CPU usage change (>50% difference) 3. First time seeing the server This minimizes API calls to Discord and updates while maintaining real-time awareness of important server changes. """ logger.info("Starting optimized server status update cycle") async with self.update_lock: # Ensure only one update runs at a time try: # Fetch current server list from Pterodactyl servers = await self.pterodactyl_api.get_servers() if not servers: logger.warning("No servers found in Pterodactyl panel during update") return # Update our local cache with fresh server data self.server_cache = {server['attributes']['identifier']: server for server in servers} logger.debug(f"Updated server cache with {len(servers)} servers") # Variables to track our update statistics update_count = 0 # Successful updates error_count = 0 # Failed updates missing_count = 0 # Missing embeds skipped_count = 0 # Servers that didn't need updates # Process each server we're tracking embeds for for server_id, location in list(self.embed_locations.items()): # Skip if server no longer exists in Pterodactyl if server_id not in self.server_cache: logger.warning(f"Server {server_id} not found in cache, skipping update") continue server_data = self.server_cache[server_id] server_name = server_data['attributes']['name'] try: logger.debug(f"Checking status for server {server_name} ({server_id})") # Get current server resource usage resources = await self.pterodactyl_api.get_server_resources(server_id) current_state = resources.get('attributes', {}).get('current_state', 'offline') cpu_usage = round(resources.get('attributes', {}).get('resources', {}).get('cpu_absolute', 0), 2) # Retrieve previous recorded state and CPU usage prev_state, prev_cpu = self.previous_states.get(server_id, (None, 0)) # DECISION LOGIC: Should we update the embed? needs_update = False # 1. Check if power state changed (most important) if current_state != prev_state: logger.debug(f"Power state changed for {server_name}: {prev_state} -> {current_state}") needs_update = True # 2. Check for significant CPU change (only if server is running) elif current_state == 'running' and abs(cpu_usage - prev_cpu) > 50: logger.debug(f"Significant CPU change for {server_name}: {prev_cpu}% -> {cpu_usage}%") needs_update = True # 3. First time we're seeing this server (initial update) elif prev_state is None: logger.debug(f"First check for {server_name}, performing initial update") needs_update = True # PERFORM UPDATE IF NEEDED if needs_update: # Generate fresh embed and view components embed, view = await self.get_server_status_embed(server_data, resources) # Get the channel where this server's embed lives channel = self.get_channel(int(location['channel_id'])) if not channel: logger.warning(f"Channel {location['channel_id']} not found for server {server_id}") continue # Fetch and update the existing message message = await channel.fetch_message(int(location['message_id'])) await message.edit(embed=embed, view=view) update_count += 1 logger.debug(f"Updated status for {server_name}") # Update our state tracking with new values self.previous_states[server_id] = (current_state, cpu_usage) else: # No significant changes detected skipped_count += 1 logger.debug(f"No changes detected for {server_name}, skipping update") except discord.NotFound: # Embed message was deleted - clean up our tracking logger.warning(f"Embed for server {server_id} not found, removing from tracking") self.embed_locations.pop(server_id, None) self.previous_states.pop(server_id, None) # Also clean up state tracking missing_count += 1 await self.save_embed_locations() except Exception as e: logger.error(f"Failed to update status for server {server_id}: {str(e)}") error_count += 1 # Small delay between servers to avoid rate limits await asyncio.sleep(0.5) # Log summary of this update cycle logger.info( f"Update cycle complete: " f"{update_count} updated, " f"{skipped_count} skipped, " f"{missing_count} missing, " f"{error_count} errors" ) except Exception as e: logger.error(f"Error in update_status task: {str(e)}") # If something went wrong, wait before retrying await asyncio.sleep(5) @update_status.before_loop async def before_update_status(self): """Wait for bot to be ready before starting update task.""" logger.debug("Waiting for bot readiness before starting update task") await self.wait_until_ready() await self.refresh_all_embeds() @update_status.after_loop async def after_update_status(self): """Handle update task stopping.""" if self.update_status.is_being_cancelled(): logger.info("Server status update task was cancelled") elif self.update_status.failed(): logger.error("Server status update task failed") async def close(self): """Cleanup when bot is shutting down.""" logger.info("Bot shutdown initiated - performing cleanup") await self.save_embed_locations() if self.pterodactyl_api: await self.pterodactyl_api.close() await super().close() # ============================================== # DISCORD COMMANDS # ============================================== intents = discord.Intents.default() intents.message_content = True bot = PterodactylBot(command_prefix="!", intents=intents) async def check_allowed_guild(interaction: discord.Interaction) -> bool: """ Verify that an interaction is coming from the allowed guild. Args: interaction: Discord interaction object Returns: bool: True if interaction is allowed, False otherwise """ if interaction.guild_id != ALLOWED_GUILD_ID: logger.warning(f"Command attempted from unauthorized guild {interaction.guild_id} by {interaction.user.name}") await interaction.response.send_message( "This bot is only available in a specific server.", ephemeral=True ) return False return True @bot.tree.command(name="server_status", description="Show the status of a game server") @app_commands.describe( server_name="The name of the server to check", channel="The channel to post the status in (defaults to current channel)" ) async def server_status( interaction: discord.Interaction, server_name: str, channel: Optional[discord.TextChannel] = None ): """ Slash command to display a server's status embed. Creates or updates the status message for the specified server. """ # Check if interaction is from allowed guild if not await check_allowed_guild(interaction): return logger.info(f"Server status command invoked by {interaction.user.name} for '{server_name}'") await interaction.response.defer() # Use specified channel or current channel target_channel = channel or interaction.channel logger.debug(f"Target channel: {target_channel.name} ({target_channel.id})") # Find the server by name in cache server_data = None server_id = None logger.debug(f"Searching cache for server '{server_name}'") for s_id, s_data in bot.server_cache.items(): if s_data['attributes']['name'].lower() == server_name.lower(): server_data = s_data server_id = s_id break if not server_data: logger.warning(f"Server '{server_name}' not found in cache") await interaction.followup.send(f"Server '{server_name}' not found.", ephemeral=True) return logger.info(f"Found server {server_name} ({server_id}), fetching resources") # Get current server status resources = await bot.pterodactyl_api.get_server_resources(server_id) # Delete old embed if it exists if server_id in bot.embed_locations: logger.debug(f"Found existing embed for {server_id}, attempting to delete") try: old_location = bot.embed_locations[server_id] old_channel = bot.get_channel(int(old_location['channel_id'])) if old_channel: try: old_message = await old_channel.fetch_message(int(old_location['message_id'])) await old_message.delete() logger.debug(f"Deleted old embed for {server_id}") except discord.NotFound: logger.debug(f"Old embed for {server_id} already deleted") except Exception as e: logger.error(f"Failed to delete old embed: {str(e)}") # Create and send new embed logger.info(f"Creating new embed for {server_name} in {target_channel.name}") embed, view = await bot.get_server_status_embed(server_data, resources) message = await target_channel.send(embed=embed, view=view) await bot.track_new_embed(server_id, message) await interaction.followup.send( f"Server status posted in {target_channel.mention}", ephemeral=True ) logger.info(f"Successfully posted status for {server_name}") @bot.tree.command(name="refresh_embeds", description="Refresh all server status embeds (admin only)") async def refresh_embeds(interaction: discord.Interaction): """Slash command to refresh all server embeds.""" if not await check_allowed_guild(interaction): return logger.info(f"Refresh embeds command invoked by {interaction.user.name}") await interaction.response.defer(ephemeral=True) # Require administrator permissions if not interaction.user.guild_permissions.administrator: logger.warning(f"Unauthorized refresh attempt by {interaction.user.name}") await interaction.followup.send( "You need administrator permissions to refresh all embeds.", ephemeral=True ) return try: logger.info("Starting full embed refresh per admin request") deleted, created = await bot.refresh_all_embeds() await interaction.followup.send( f"Refreshed all embeds. Deleted {deleted} old embeds, created {created} new ones.", ephemeral=True ) logger.info(f"Embed refresh completed: {deleted} deleted, {created} created") except Exception as e: logger.error(f"Embed refresh failed: {str(e)}") await interaction.followup.send( f"Failed to refresh embeds: {str(e)}", ephemeral=True ) # ============================================== # BOT EVENTS # ============================================== @bot.event async def on_interaction(interaction: discord.Interaction): """Global interaction handler to check guild before processing any interaction.""" if interaction.guild_id != ALLOWED_GUILD_ID: logger.debug(f"Ignoring interaction from unauthorized guild {interaction.guild_id}") await interaction.response.send_message( "This bot is only available in a specific server.", ephemeral=True ) return @bot.event async def on_ready(): """Called when the bot successfully connects to Discord.""" logger.info(f"Bot connected as {bot.user.name} (ID: {bot.user.id})") try: # Sync commands only to the allowed guild guild = discord.Object(id=ALLOWED_GUILD_ID) bot.tree.copy_global_to(guild=guild) synced = await bot.tree.sync(guild=guild) logger.info(f"Successfully synced {len(synced)} command(s) to guild {ALLOWED_GUILD_ID}: {[cmd.name for cmd in synced]}") except Exception as e: logger.error(f"Command sync failed: {str(e)}") # ============================================== # SYSTEM SIGNAL HANDLERS # ============================================== def handle_sigint(signum: int, frame: types.FrameType) -> None: """ Handle SIGINT signals (Ctrl+C) by initiating graceful shutdown. Args: signum: The signal number (signal.SIGINT) frame: Current stack frame (unused but required by signal handler signature) """ logger.info("Received SIGINT (Ctrl+C), initiating graceful shutdown...") raise KeyboardInterrupt def handle_sigterm(signum: int, frame: types.FrameType) -> None: """ Handle SIGTERM signals (container stop) by initiating graceful shutdown. Args: signum: The signal number (signal.SIGTERM) frame: Current stack frame (unused but required by signal handler signature) """ logger.info("Received SIGTERM (container stop), initiating graceful shutdown...") raise KeyboardInterrupt # ============================================== # BOT STARTUP # ============================================== if __name__ == "__main__": """ Main entry point for the bot application. Handles: - Signal registration for graceful shutdowns (SIGINT/SIGTERM) - Primary bot execution loop - Error handling and crash reporting - Resource cleanup on shutdown Flow: 1. Initialize signal handlers 2. Start bot with Discord token 3. Handle interrupts and exceptions 4. Execute final cleanup """ logger.info("Starting bot initialization") # Register signal handlers signal.signal(signal.SIGINT, handle_sigint) # For Ctrl+C signal.signal(signal.SIGTERM, handle_sigterm) # For container stop commands logger.info("System signal handlers registered") try: bot.run(DISCORD_TOKEN) except KeyboardInterrupt: logger.info("Bot shutting down") except Exception as e: logger.error(f"Bot crashed: {str(e)}") sys.exit(1) # Exit with error code for crash finally: logger.info("Bot shutdown complete") sys.exit(0) # Explicit clean exit