Files
pterodactyl-discord-bot/pterodisbot.py
k.eaven 52ccca7161
All checks were successful
Docker Build and Push / build-and-push (push) Successful in 1m9s
Update system signal handling
2025-08-12 03:52:40 -07:00

1157 lines
50 KiB
Python

"""
Pterodactyl Discord Bot
A comprehensive Discord bot for managing Pterodactyl game servers through Discord.
Features:
- Server status monitoring with auto-updating embeds
- Power controls (start/stop/restart)
- Server address/port display
- Multi-channel embed support
- Automatic embed refresh system
- Role-based access control
- Single-guild restriction
- Extensive logging for all operations
"""
import discord
from discord.ext import commands, tasks
from discord import app_commands
import os
import sys
import signal
import types
import aiohttp
import asyncio
import json
import traceback
import logging
from logging.handlers import RotatingFileHandler
import configparser
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from pathlib import Path
import generate_config
# ==============================================
# LOGGING SETUP
# ==============================================
logs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
os.makedirs(logs_dir, exist_ok=True)
logger = logging.getLogger('pterodisbot')
logger.setLevel(logging.DEBUG)
# File handler for logs (rotates when reaching 5MB, keeps 3 backups)
handler = RotatingFileHandler(
filename=os.path.join(logs_dir, 'pterodisbot.log'),
maxBytes=5*1024*1024, # 5 MiB max log file size
backupCount=3, # Rotate through 3 files
encoding='utf-8'
)
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
# Console handler for real-time output
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(console_handler)
logger.info("Initialized logging system with file and console output")
# ==============================================
# CONFIGURATION SETUP
# ==============================================
# generate_config.generate_config()
# logger.debug("Gennerated config.ini file using values from .env")
config = configparser.ConfigParser()
config.read('config.ini')
# ==============================================
# CONFIGURATION VALIDATION
# ==============================================
class ConfigValidationError(Exception):
"""Custom exception for configuration validation errors."""
pass
def validate_config():
"""
Validate all required configuration values at startup.
Raises ConfigValidationError if any required values are missing or invalid.
"""
errors = []
# Validate Pterodactyl section
if not config.has_section('Pterodactyl'):
errors.append("Missing [Pterodactyl] section in config.ini")
else:
required_ptero = ['PanelURL', 'ClientAPIKey', 'ApplicationAPIKey']
for key in required_ptero:
if not config.get('Pterodactyl', key, fallback=None):
errors.append(f"Missing required Pterodactyl config value: {key}")
# Validate Discord section
if not config.has_section('Discord'):
errors.append("Missing [Discord] section in config.ini")
else:
required_discord = ['Token', 'AllowedGuildID']
for key in required_discord:
if not config.get('Discord', key, fallback=None):
errors.append(f"Missing required Discord config value: {key}")
# Validate AllowedGuildID is a valid integer
try:
guild_id = config.getint('Discord', 'AllowedGuildID', fallback=0)
if guild_id <= 0:
errors.append("AllowedGuildID must be a positive integer")
except ValueError:
errors.append("AllowedGuildID must be a valid integer")
# Validate API keys have correct prefixes
client_key = config.get('Pterodactyl', 'ClientAPIKey', fallback='')
if client_key and not client_key.startswith('ptlc_'):
errors.append("ClientAPIKey should start with 'ptlc_'")
app_key = config.get('Pterodactyl', 'ApplicationAPIKey', fallback='')
if app_key and not app_key.startswith('ptla_'):
errors.append("ApplicationAPIKey should start with 'ptla_'")
# Validate PanelURL is a valid URL
panel_url = config.get('Pterodactyl', 'PanelURL', fallback='')
if panel_url and not (panel_url.startswith('http://') or panel_url.startswith('https://')):
errors.append("PanelURL must start with http:// or https://")
if errors:
error_msg = "Configuration validation failed:\n- " + "\n- ".join(errors)
logger.error(error_msg)
raise ConfigValidationError(error_msg)
logger.info("Configuration validation passed")
# ==============================================
# CONSTANTS (Updated with validation)
# ==============================================
try:
validate_config()
PTERODACTYL_URL = config.get('Pterodactyl', 'PanelURL')
PTERODACTYL_CLIENT_API_KEY = config.get('Pterodactyl', 'ClientAPIKey')
PTERODACTYL_APPLICATION_API_KEY = config.get('Pterodactyl', 'ApplicationAPIKey')
DISCORD_TOKEN = config.get('Discord', 'Token')
ALLOWED_GUILD_ID = config.getint('Discord', 'AllowedGuildID')
REQUIRED_ROLE = "Game Server User"
UPDATE_INTERVAL = 10
EMBED_LOCATIONS_FILE = "./embed/embed_locations.json"
logger.debug("Loaded and validated configuration values from config.ini")
except ConfigValidationError as e:
logger.critical(f"Configuration error: {str(e)}")
raise
except Exception as e:
logger.critical(f"Unexpected error loading configuration: {str(e)}")
raise
# ==============================================
# PTERODACTYL API CLASS
# ==============================================
class PterodactylAPI:
"""
Handles all interactions with the Pterodactyl Panel API.
Uses client API key for client endpoints and application API key for admin endpoints.
Provides methods for server management and monitoring.
"""
def __init__(self, panel_url: str, client_api_key: str, application_api_key: str):
"""
Initialize the Pterodactyl API client with both API keys.
Args:
panel_url: URL of the Pterodactyl panel (must include protocol)
client_api_key: API key for client endpoints (starts with ptlc_)
application_api_key: API key for application endpoints (starts with ptla_)
"""
self.panel_url = panel_url.rstrip('/')
self.client_api_key = client_api_key
self.application_api_key = application_api_key
self.session = None
self.lock = asyncio.Lock() # Prevents concurrent API access
logger.info("Initialized PterodactylAPI client with provided credentials")
async def initialize(self):
"""Initialize the aiohttp client session for API requests."""
self.session = aiohttp.ClientSession()
logger.debug("Created new aiohttp ClientSession")
async def close(self):
"""Cleanly close the aiohttp session when shutting down."""
if self.session and not self.session.closed:
await self.session.close()
logger.debug("Closed aiohttp ClientSession")
async def _request(self, method: str, endpoint: str, data: Optional[dict] = None, use_application_key: bool = False) -> dict:
"""
Make an authenticated request to the Pterodactyl API.
Args:
method: HTTP method (GET, POST, PUT, DELETE, etc.)
endpoint: API endpoint (e.g., 'application/servers')
data: Optional JSON payload for POST/PUT requests
use_application_key: Whether to use the application API key (admin endpoints)
Returns:
Dictionary containing API response or error information
Raises:
aiohttp.ClientError: For network-related issues
json.JSONDecodeError: If response cannot be parsed as JSON
"""
url = f"{self.panel_url}/api/{endpoint}"
api_key_type = "Application" if use_application_key else "Client"
logger.debug(f"Preparing {method} request to {endpoint} using {api_key_type} API key")
# Choose the appropriate API key
api_key = self.application_api_key if use_application_key else self.client_api_key
headers = {
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
"Content-Type": "application/json"
}
try:
async with self.lock:
logger.debug(f"Acquired lock for API request to {endpoint}")
async with self.session.request(
method,
url,
headers=headers,
json=data
) as response:
if response.status == 204: # No content
logger.debug(f"Received 204 No Content response from {endpoint}")
return {"status": "success"}
response_data = await response.json()
logger.debug(f"Received response from {endpoint} with status {response.status}")
if response.status >= 400:
error_msg = response_data.get('errors', [{}])[0].get('detail', 'Unknown error')
logger.error(f"API request to {endpoint} failed with status {response.status}: {error_msg}")
return {"status": "error", "message": error_msg}
return response_data
except Exception as e:
logger.error(f"Exception during API request to {endpoint}: {str(e)}")
return {"status": "error", "message": str(e)}
async def get_servers(self) -> List[dict]:
"""
Get a list of all servers from the Pterodactyl panel.
Uses application API key as this is an admin endpoint.
Returns:
List of server dictionaries containing all server attributes
"""
logger.info("Fetching list of all servers from Pterodactyl panel")
response = await self._request("GET", "application/servers", use_application_key=True)
servers = response.get('data', [])
logger.info(f"Retrieved {len(servers)} servers from Pterodactyl panel")
return servers
async def get_server_resources(self, server_id: str) -> dict:
"""
Get resource usage for a specific server.
Uses client API key as this is a client endpoint.
Args:
server_id: The Pterodactyl server identifier
Returns:
Dictionary containing server resource usage and current state
"""
logger.debug(f"Fetching resource usage for server {server_id}")
try:
response = await self._request("GET", f"client/servers/{server_id}/resources")
if response.get('status') == 'error':
error_msg = response.get('message', 'Unknown error')
logger.error(f"Failed to get resources for server {server_id}: {error_msg}")
return {'attributes': {'current_state': 'offline'}}
state = response.get('attributes', {}).get('current_state', 'unknown')
logger.debug(f"Server {server_id} current state: {state}")
return response
except Exception as e:
logger.error(f"Exception getting resources for server {server_id}: {str(e)}")
return {'attributes': {'current_state': 'offline'}}
async def send_power_action(self, server_id: str, action: str) -> dict:
"""
Send a power action to a server (start/stop/restart).
Uses client API key as this is a client endpoint.
Args:
server_id: The Pterodactyl server identifier
action: Power action to send (start/stop/restart)
Returns:
Dictionary containing API response status
"""
valid_actions = ['start', 'stop', 'restart']
if action not in valid_actions:
logger.warning(f"Invalid power action attempted: {action}")
return {"status": "error", "message": f"Invalid action. Must be one of: {', '.join(valid_actions)}"}
logger.info(f"Sending {action} command to server {server_id}")
result = await self._request("POST", f"client/servers/{server_id}/power", {"signal": action})
if result.get('status') == 'success':
logger.info(f"Successfully executed {action} on server {server_id}")
else:
logger.error(f"Failed to execute {action} on server {server_id}: {result.get('message', 'Unknown error')}")
return result
async def get_server_details(self, server_id: str) -> dict:
"""
Get detailed server information including allocations.
Uses application API key as this is an admin endpoint.
Args:
server_id: The Pterodactyl server identifier
Returns:
Dictionary containing detailed server information
"""
logger.debug(f"Fetching detailed information for server {server_id}")
return await self._request("GET", f"application/servers/{server_id}", use_application_key=True)
async def get_server_allocations(self, server_id: str) -> dict:
"""
Get allocation information for a server (IP addresses and ports).
Uses application API key as this is an admin endpoint.
Args:
server_id: The Pterodactyl server identifier
Returns:
Dictionary containing server allocation information
"""
logger.debug(f"Fetching allocation information for server {server_id}")
return await self._request("GET", f"application/servers/{server_id}/allocations", use_application_key=True)
# ==============================================
# SERVER STATUS VIEW CLASS (Buttons and UI)
# ==============================================
class ServerStatusView(discord.ui.View):
"""
Interactive Discord view containing server control buttons.
Provides persistent controls for server management with role-based access.
"""
def __init__(self, server_id: str, server_name: str, pterodactyl_api: PterodactylAPI, server_data: dict):
"""
Initialize the server status view with control buttons.
Args:
server_id: The server's Pterodactyl identifier
server_name: Human-readable server name
pterodactyl_api: API client instance
server_data: Full server data from Pterodactyl
"""
super().__init__(timeout=None) # Persistent view
self.server_id = server_id
self.server_name = server_name
self.api = pterodactyl_api
self.server_data = server_data
logger.debug(f"Created ServerStatusView for {server_name} ({server_id})")
async def interaction_check(self, interaction: discord.Interaction) -> bool:
"""
Verify the interacting user has the required role and is in the allowed guild.
Args:
interaction: Discord interaction object
Returns:
bool: True if authorized, False otherwise
"""
# First check if interaction is from the allowed guild
if interaction.guild_id != ALLOWED_GUILD_ID:
logger.warning(f"Unauthorized interaction attempt from guild {interaction.guild_id}")
await interaction.response.send_message(
"This bot is only available in a specific server.",
ephemeral=True
)
return False
# Then check for required role
logger.debug(f"Checking permissions for {interaction.user.name} on server {self.server_name}")
has_role = any(role.name == REQUIRED_ROLE for role in interaction.user.roles)
if not has_role:
logger.warning(f"Permission denied for {interaction.user.name} - missing '{REQUIRED_ROLE}' role")
await interaction.response.send_message(
f"You don't have permission to control servers. You need the '{REQUIRED_ROLE}' role.",
ephemeral=True
)
return False
logger.debug(f"Permission granted for {interaction.user.name}")
return True
async def on_error(self, interaction: discord.Interaction, error: Exception, item: discord.ui.Item):
"""
Handle errors in button interactions.
Args:
interaction: Discord interaction object
error: Exception that occurred
item: The UI item that triggered the error
"""
logger.error(f"View error in {self.server_name} by {interaction.user.name}: {str(error)}")
await interaction.response.send_message(
"An error occurred while processing your request.",
ephemeral=True
)
@discord.ui.button(label="Start", style=discord.ButtonStyle.green, custom_id="start_button")
async def start_button(self, interaction: discord.Interaction, button: discord.ui.Button):
"""Send a start command to the server."""
logger.info(f"Start button pressed for {self.server_name} by {interaction.user.name}")
await interaction.response.defer(ephemeral=True)
result = await self.api.send_power_action(self.server_id, "start")
if result.get('status') == 'success':
message = f"Server '{self.server_name}' is starting..."
logger.info(f"Successfully started server {self.server_name}")
else:
message = f"Failed to start server: {result.get('message', 'Unknown error')}"
logger.error(f"Failed to start server {self.server_name}: {message}")
await interaction.followup.send(message, ephemeral=True)
@discord.ui.button(label="Stop", style=discord.ButtonStyle.red, custom_id="stop_button")
async def stop_button(self, interaction: discord.Interaction, button: discord.ui.Button):
"""Send a stop command to the server."""
logger.info(f"Stop button pressed for {self.server_name} by {interaction.user.name}")
await interaction.response.defer(ephemeral=True)
result = await self.api.send_power_action(self.server_id, "stop")
if result.get('status') == 'success':
message = f"Server '{self.server_name}' is stopping..."
logger.info(f"Successfully stopped server {self.server_name}")
else:
message = f"Failed to stop server: {result.get('message', 'Unknown error')}"
logger.error(f"Failed to stop server {self.server_name}: {message}")
await interaction.followup.send(message, ephemeral=True)
@discord.ui.button(label="Restart", style=discord.ButtonStyle.blurple, custom_id="restart_button")
async def restart_button(self, interaction: discord.Interaction, button: discord.ui.Button):
"""Send a restart command to the server."""
logger.info(f"Restart button pressed for {self.server_name} by {interaction.user.name}")
await interaction.response.defer(ephemeral=True)
result = await self.api.send_power_action(self.server_id, "restart")
if result.get('status') == 'success':
message = f"Server '{self.server_name}' is restarting..."
logger.info(f"Successfully restarted server {self.server_name}")
else:
message = f"Failed to restart server: {result.get('message', 'Unknown error')}"
logger.error(f"Failed to restart server {self.server_name}: {message}")
await interaction.followup.send(message, ephemeral=True)
@discord.ui.button(label="Show Address", style=discord.ButtonStyle.grey, custom_id="show_address_button")
async def show_address_button(self, interaction: discord.Interaction, button: discord.ui.Button):
"""Show server's default allocation IP and port using client API."""
logger.info(f"Show Address button pressed for {self.server_name} by {interaction.user.name}")
try:
await interaction.response.defer(ephemeral=True)
logger.debug(f"Fetching server details for {self.server_id}")
# Get server details using client API
server_details = await self.api._request(
"GET",
f"client/servers/{self.server_id}",
use_application_key=False
)
if server_details.get('status') == 'error':
error_msg = server_details.get('message', 'Unknown error')
logger.error(f"Failed to get server details for {self.server_id}: {error_msg}")
raise ValueError(error_msg)
attributes = server_details.get('attributes', {})
relationships = attributes.get('relationships', {})
allocations = relationships.get('allocations', {}).get('data', [])
if not allocations:
logger.warning(f"No allocations found for server {self.server_id}")
raise ValueError("No allocations found for this server")
# Find the default allocation (is_default=True)
default_allocation = next(
(alloc for alloc in allocations
if alloc.get('attributes', {}).get('is_default', False)),
allocations[0] # Fallback to first allocation if no default found
)
allocation_attrs = default_allocation.get('attributes', {})
ip_alias = allocation_attrs.get('ip_alias', 'Unknown')
port = str(allocation_attrs.get('port', 'Unknown'))
logger.debug(f"Retrieved connection info for {self.server_id}: {ip_alias}:{port}")
# Create and send embed
embed = discord.Embed(
title=f"{self.server_name} Connection Info",
color=discord.Color.blue(),
description=f"Server ID: `{self.server_id}`"
)
embed.add_field(name="Address", value=f"`{ip_alias}`", inline=True)
embed.add_field(name="Port", value=f"`{port}`", inline=True)
await interaction.followup.send(embed=embed, ephemeral=True)
logger.info(f"Displayed connection info for {self.server_name}")
except Exception as e:
logger.error(f"Failed to show address for {self.server_name}: {str(e)}")
await interaction.followup.send(
"⚠️ Failed to get connection info. The server may not have any ports allocated.",
ephemeral=True
)
# ==============================================
# MAIN BOT CLASS
# ==============================================
class PterodactylBot(commands.Bot):
"""
Main bot class for Pterodactyl server management.
Handles Discord interactions, embed management, and background tasks.
Manages server status embeds and user commands.
"""
def __init__(self, *args, **kwargs):
"""
Initialize the Pterodactyl bot instance.
Sets up caches, locks, and storage paths.
"""
super().__init__(*args, **kwargs)
self.pterodactyl_api = None # Pterodactyl API client
self.server_cache: Dict[str, dict] = {} # Cache of server data from Pterodactyl
self.embed_locations: Dict[str, Dict[str, int]] = {} # Tracks where embeds are posted
self.update_lock = asyncio.Lock() # Prevents concurrent updates
self.embed_storage_path = Path(EMBED_LOCATIONS_FILE) # File to store embed locations
# Track previous server states and CPU usage to detect changes
# Format: {server_id: (state, cpu_usage)}
self.previous_states: Dict[str, Tuple[str, float]] = {}
logger.info("Initialized PterodactylBot instance with state tracking")
async def setup_hook(self):
"""
Bot setup routine called when the bot is starting.
Initializes API client, loads saved data, and starts background tasks.
"""
logger.info("Running bot setup hook")
# Initialize API client
self.pterodactyl_api = PterodactylAPI(
PTERODACTYL_URL,
PTERODACTYL_CLIENT_API_KEY,
PTERODACTYL_APPLICATION_API_KEY
)
await self.pterodactyl_api.initialize()
logger.info("Initialized Pterodactyl API client")
# Load saved embed locations
await self.load_embed_locations()
# Start background update task
self.update_status.start()
logger.info("Started background status update task")
async def load_embed_locations(self):
"""Load saved embed locations from JSON storage file."""
logger.debug("Attempting to load embed locations from storage")
if not self.embed_storage_path.exists():
logger.info("No existing embed locations file found")
return
try:
with open(self.embed_storage_path, 'r') as f:
self.embed_locations = json.load(f)
logger.info(f"Loaded {len(self.embed_locations)} embed locations from storage")
except Exception as e:
logger.error(f"Failed to load embed locations: {str(e)}")
async def save_embed_locations(self):
"""Save current embed locations to JSON storage file."""
logger.debug("Attempting to save embed locations to storage")
try:
with open(self.embed_storage_path, 'w') as f:
json.dump(self.embed_locations, f, indent=2)
logger.debug("Successfully saved embed locations to disk")
except Exception as e:
logger.error(f"Failed to save embed locations: {str(e)}")
async def refresh_all_embeds(self) -> Tuple[int, int]:
"""
Perform a complete refresh of all server status embeds.
Creates new embeds and deletes old ones to prevent duplication.
Returns:
Tuple of (deleted_count, created_count) - number of embeds processed
"""
logger.info("Starting full refresh of all server embeds")
async with self.update_lock:
try:
await asyncio.sleep(1) # Initial delay
# Get current server list if cache is empty
if not self.server_cache:
logger.debug("Server cache empty, fetching fresh server list")
servers = await self.pterodactyl_api.get_servers()
if not servers:
logger.warning("No servers found in Pterodactyl panel")
return 0, 0
self.server_cache = {server['attributes']['identifier']: server for server in servers}
logger.info(f"Populated server cache with {len(servers)} servers")
# Create new embeds in temporary storage
new_embeds = {}
created_count = 0
skipped_count = 0
for server_id, server_data in self.server_cache.items():
# Skip if we don't have an existing location to recreate in
if server_id not in self.embed_locations:
skipped_count += 1
continue
channel_id = self.embed_locations[server_id]['channel_id']
channel = self.get_channel(int(channel_id))
if not channel:
logger.warning(f"Channel {channel_id} not found for server {server_id}")
continue
try:
logger.debug(f"Creating new embed for server {server_id}")
# Get current server status
resources = await self.pterodactyl_api.get_server_resources(server_id)
# Create new embed
embed, view = await self.get_server_status_embed(server_data, resources)
message = await channel.send(embed=embed, view=view)
# Store in temporary location
new_embeds[server_id] = {
'channel_id': str(channel.id),
'message_id': str(message.id)
}
created_count += 1
logger.info(f"Created new embed for server {server_data['attributes']['name']}")
await asyncio.sleep(1) # Rate limit protection
except Exception as e:
logger.error(f"Failed to create new embed for server {server_id}: {str(e)}")
logger.info(f"Created {created_count} new embeds, skipped {skipped_count} servers")
# Only proceed if we created at least one new embed
if not new_embeds:
logger.warning("No new embeds created during refresh")
return 0, 0
# Now delete old embeds
deleted_count = 0
not_found_count = 0
for server_id, location in list(self.embed_locations.items()):
try:
channel = self.get_channel(int(location['channel_id']))
if channel:
try:
message = await channel.fetch_message(int(location['message_id']))
await message.delete()
deleted_count += 1
logger.debug(f"Deleted old embed for server {server_id}")
await asyncio.sleep(0.5) # Rate limit protection
except discord.NotFound:
not_found_count += 1
logger.debug(f"Old embed for server {server_id} already deleted")
except Exception as e:
logger.error(f"Failed to delete old embed for server {server_id}: {str(e)}")
except Exception as e:
logger.error(f"Error processing old embed for server {server_id}: {str(e)}")
logger.info(f"Deleted {deleted_count} old embeds, {not_found_count} already missing")
# Update storage with new embed locations
self.embed_locations = new_embeds
await self.save_embed_locations()
return deleted_count, created_count
except Exception as e:
logger.error(f"Critical error during embed refresh: {str(e)}")
raise
async def track_new_embed(self, server_id: str, message: discord.Message):
"""
Track a newly created embed in storage.
Args:
server_id: The server's Pterodactyl identifier
message: Discord message containing the embed
"""
logger.debug(f"Tracking new embed for server {server_id} in channel {message.channel.id}")
self.embed_locations[server_id] = {
'channel_id': str(message.channel.id),
'message_id': str(message.id)
}
await self.save_embed_locations()
async def get_server_status_embed(self, server_data: dict, resources: dict) -> Tuple[discord.Embed, ServerStatusView]:
"""
Create a status embed and view for a server.
Args:
server_data: Server information from Pterodactyl
resources: Current resource usage data
Returns:
Tuple of (embed, view) objects ready for display
"""
attributes = server_data.get('attributes', {})
identifier = attributes.get('identifier', 'unknown')
name = attributes.get('name', 'Unknown Server')
description = attributes.get('description', 'No description available')
logger.debug(f"Building status embed for server {name} ({identifier})")
# Parse resource data
resource_attributes = resources.get('attributes', {})
current_state = resource_attributes.get('current_state', 'offline').title()
is_suspended = attributes.get('suspended', False)
# Create embed with appropriate color based on status
embed = discord.Embed(
title=f"{name} - {current_state}",
description=description,
color=discord.Color.blue() if current_state.lower() == "running" else discord.Color.red(),
timestamp=datetime.now()
)
embed.add_field(name="Server ID", value=identifier, inline=True)
if is_suspended:
embed.add_field(name="Status", value="⛔ Suspended", inline=True)
else:
embed.add_field(name="Status", value="✅ Active", inline=True)
# Add resource usage if server is running
if current_state.lower() == "running":
cpu_usage = round(resource_attributes.get('resources', {}).get('cpu_absolute', 0), 2)
memory_usage = round(resource_attributes.get('resources', {}).get('memory_bytes', 0) / (1024 ** 2), 2)
disk_usage = round(resource_attributes.get('resources', {}).get('disk_bytes', 0) / (1024 ** 2), 2)
network_rx = round(resource_attributes.get('resources', {}).get('network_rx_bytes', 0) / (1024 ** 2), 2)
network_tx = round(resource_attributes.get('resources', {}).get('network_tx_bytes', 0) / (1024 ** 2), 2)
embed.add_field(name="CPU Usage", value=f"{cpu_usage}%", inline=True)
embed.add_field(name="Memory Usage", value=f"{memory_usage} MB", inline=True)
embed.add_field(name="Disk Usage", value=f"{disk_usage} MB", inline=True)
embed.add_field(name="Network", value=f"⬇️ {network_rx} MB / ⬆️ {network_tx} MB", inline=False)
embed.set_footer(text="Last updated")
# Create interactive view with control buttons
view = ServerStatusView(
server_id=identifier,
server_name=name,
pterodactyl_api=self.pterodactyl_api,
server_data=server_data
)
logger.debug(f"Successfully built status components for {name}")
return embed, view
@tasks.loop(seconds=UPDATE_INTERVAL)
async def update_status(self):
"""
Background task to update server status embeds when:
1. Server power state changes (started/stopped/restarted)
2. Significant CPU usage change (>50% difference)
3. First time seeing the server
This minimizes API calls to Discord and updates while maintaining
real-time awareness of important server changes.
"""
logger.info("Starting optimized server status update cycle")
async with self.update_lock: # Ensure only one update runs at a time
try:
# Fetch current server list from Pterodactyl
servers = await self.pterodactyl_api.get_servers()
if not servers:
logger.warning("No servers found in Pterodactyl panel during update")
return
# Update our local cache with fresh server data
self.server_cache = {server['attributes']['identifier']: server for server in servers}
logger.debug(f"Updated server cache with {len(servers)} servers")
# Variables to track our update statistics
update_count = 0 # Successful updates
error_count = 0 # Failed updates
missing_count = 0 # Missing embeds
skipped_count = 0 # Servers that didn't need updates
# Process each server we're tracking embeds for
for server_id, location in list(self.embed_locations.items()):
# Skip if server no longer exists in Pterodactyl
if server_id not in self.server_cache:
logger.warning(f"Server {server_id} not found in cache, skipping update")
continue
server_data = self.server_cache[server_id]
server_name = server_data['attributes']['name']
try:
logger.debug(f"Checking status for server {server_name} ({server_id})")
# Get current server resource usage
resources = await self.pterodactyl_api.get_server_resources(server_id)
current_state = resources.get('attributes', {}).get('current_state', 'offline')
cpu_usage = round(resources.get('attributes', {}).get('resources', {}).get('cpu_absolute', 0), 2)
# Retrieve previous recorded state and CPU usage
prev_state, prev_cpu = self.previous_states.get(server_id, (None, 0))
# DECISION LOGIC: Should we update the embed?
needs_update = False
# 1. Check if power state changed (most important)
if current_state != prev_state:
logger.debug(f"Power state changed for {server_name}: {prev_state} -> {current_state}")
needs_update = True
# 2. Check for significant CPU change (only if server is running)
elif current_state == 'running' and abs(cpu_usage - prev_cpu) > 50:
logger.debug(f"Significant CPU change for {server_name}: {prev_cpu}% -> {cpu_usage}%")
needs_update = True
# 3. First time we're seeing this server (initial update)
elif prev_state is None:
logger.debug(f"First check for {server_name}, performing initial update")
needs_update = True
# PERFORM UPDATE IF NEEDED
if needs_update:
# Generate fresh embed and view components
embed, view = await self.get_server_status_embed(server_data, resources)
# Get the channel where this server's embed lives
channel = self.get_channel(int(location['channel_id']))
if not channel:
logger.warning(f"Channel {location['channel_id']} not found for server {server_id}")
continue
# Fetch and update the existing message
message = await channel.fetch_message(int(location['message_id']))
await message.edit(embed=embed, view=view)
update_count += 1
logger.debug(f"Updated status for {server_name}")
# Update our state tracking with new values
self.previous_states[server_id] = (current_state, cpu_usage)
else:
# No significant changes detected
skipped_count += 1
logger.debug(f"No changes detected for {server_name}, skipping update")
except discord.NotFound:
# Embed message was deleted - clean up our tracking
logger.warning(f"Embed for server {server_id} not found, removing from tracking")
self.embed_locations.pop(server_id, None)
self.previous_states.pop(server_id, None) # Also clean up state tracking
missing_count += 1
await self.save_embed_locations()
except Exception as e:
logger.error(f"Failed to update status for server {server_id}: {str(e)}")
error_count += 1
# Small delay between servers to avoid rate limits
await asyncio.sleep(0.5)
# Log summary of this update cycle
logger.info(
f"Update cycle complete: "
f"{update_count} updated, "
f"{skipped_count} skipped, "
f"{missing_count} missing, "
f"{error_count} errors"
)
except Exception as e:
logger.error(f"Error in update_status task: {str(e)}")
# If something went wrong, wait before retrying
await asyncio.sleep(5)
@update_status.before_loop
async def before_update_status(self):
"""Wait for bot to be ready before starting update task."""
logger.debug("Waiting for bot readiness before starting update task")
await self.wait_until_ready()
await self.refresh_all_embeds()
@update_status.after_loop
async def after_update_status(self):
"""Handle update task stopping."""
if self.update_status.is_being_cancelled():
logger.info("Server status update task was cancelled")
elif self.update_status.failed():
logger.error("Server status update task failed")
async def close(self):
"""Cleanup when bot is shutting down."""
logger.info("Bot shutdown initiated - performing cleanup")
await self.save_embed_locations()
if self.pterodactyl_api:
await self.pterodactyl_api.close()
await super().close()
# ==============================================
# DISCORD COMMANDS
# ==============================================
intents = discord.Intents.default()
intents.message_content = True
bot = PterodactylBot(command_prefix="!", intents=intents)
async def check_allowed_guild(interaction: discord.Interaction) -> bool:
"""
Verify that an interaction is coming from the allowed guild.
Args:
interaction: Discord interaction object
Returns:
bool: True if interaction is allowed, False otherwise
"""
if interaction.guild_id != ALLOWED_GUILD_ID:
logger.warning(f"Command attempted from unauthorized guild {interaction.guild_id} by {interaction.user.name}")
await interaction.response.send_message(
"This bot is only available in a specific server.",
ephemeral=True
)
return False
return True
@bot.tree.command(name="server_status", description="Show the status of a game server")
@app_commands.describe(
server_name="The name of the server to check",
channel="The channel to post the status in (defaults to current channel)"
)
async def server_status(
interaction: discord.Interaction,
server_name: str,
channel: Optional[discord.TextChannel] = None
):
"""
Slash command to display a server's status embed.
Creates or updates the status message for the specified server.
"""
# Check if interaction is from allowed guild
if not await check_allowed_guild(interaction):
return
logger.info(f"Server status command invoked by {interaction.user.name} for '{server_name}'")
await interaction.response.defer()
# Use specified channel or current channel
target_channel = channel or interaction.channel
logger.debug(f"Target channel: {target_channel.name} ({target_channel.id})")
# Find the server by name in cache
server_data = None
server_id = None
logger.debug(f"Searching cache for server '{server_name}'")
for s_id, s_data in bot.server_cache.items():
if s_data['attributes']['name'].lower() == server_name.lower():
server_data = s_data
server_id = s_id
break
if not server_data:
logger.warning(f"Server '{server_name}' not found in cache")
await interaction.followup.send(f"Server '{server_name}' not found.", ephemeral=True)
return
logger.info(f"Found server {server_name} ({server_id}), fetching resources")
# Get current server status
resources = await bot.pterodactyl_api.get_server_resources(server_id)
# Delete old embed if it exists
if server_id in bot.embed_locations:
logger.debug(f"Found existing embed for {server_id}, attempting to delete")
try:
old_location = bot.embed_locations[server_id]
old_channel = bot.get_channel(int(old_location['channel_id']))
if old_channel:
try:
old_message = await old_channel.fetch_message(int(old_location['message_id']))
await old_message.delete()
logger.debug(f"Deleted old embed for {server_id}")
except discord.NotFound:
logger.debug(f"Old embed for {server_id} already deleted")
except Exception as e:
logger.error(f"Failed to delete old embed: {str(e)}")
# Create and send new embed
logger.info(f"Creating new embed for {server_name} in {target_channel.name}")
embed, view = await bot.get_server_status_embed(server_data, resources)
message = await target_channel.send(embed=embed, view=view)
await bot.track_new_embed(server_id, message)
await interaction.followup.send(
f"Server status posted in {target_channel.mention}",
ephemeral=True
)
logger.info(f"Successfully posted status for {server_name}")
@bot.tree.command(name="refresh_embeds", description="Refresh all server status embeds (admin only)")
async def refresh_embeds(interaction: discord.Interaction):
"""Slash command to refresh all server embeds."""
if not await check_allowed_guild(interaction):
return
logger.info(f"Refresh embeds command invoked by {interaction.user.name}")
await interaction.response.defer(ephemeral=True)
# Require administrator permissions
if not interaction.user.guild_permissions.administrator:
logger.warning(f"Unauthorized refresh attempt by {interaction.user.name}")
await interaction.followup.send(
"You need administrator permissions to refresh all embeds.",
ephemeral=True
)
return
try:
logger.info("Starting full embed refresh per admin request")
deleted, created = await bot.refresh_all_embeds()
await interaction.followup.send(
f"Refreshed all embeds. Deleted {deleted} old embeds, created {created} new ones.",
ephemeral=True
)
logger.info(f"Embed refresh completed: {deleted} deleted, {created} created")
except Exception as e:
logger.error(f"Embed refresh failed: {str(e)}")
await interaction.followup.send(
f"Failed to refresh embeds: {str(e)}",
ephemeral=True
)
# ==============================================
# BOT EVENTS
# ==============================================
@bot.event
async def on_interaction(interaction: discord.Interaction):
"""Global interaction handler to check guild before processing any interaction."""
if interaction.guild_id != ALLOWED_GUILD_ID:
logger.debug(f"Ignoring interaction from unauthorized guild {interaction.guild_id}")
await interaction.response.send_message(
"This bot is only available in a specific server.",
ephemeral=True
)
return
@bot.event
async def on_ready():
"""Called when the bot successfully connects to Discord."""
logger.info(f"Bot connected as {bot.user.name} (ID: {bot.user.id})")
try:
# Sync commands only to the allowed guild
guild = discord.Object(id=ALLOWED_GUILD_ID)
bot.tree.copy_global_to(guild=guild)
synced = await bot.tree.sync(guild=guild)
logger.info(f"Successfully synced {len(synced)} command(s) to guild {ALLOWED_GUILD_ID}: {[cmd.name for cmd in synced]}")
except Exception as e:
logger.error(f"Command sync failed: {str(e)}")
# ==============================================
# SYSTEM SIGNAL HANDLERS
# ==============================================
def handle_sigint(signum: int, frame: types.FrameType) -> None:
"""
Handle SIGINT signals (Ctrl+C) by initiating graceful shutdown.
Args:
signum: The signal number (signal.SIGINT)
frame: Current stack frame (unused but required by signal handler signature)
"""
logger.info("Received SIGINT (Ctrl+C), initiating graceful shutdown...")
raise KeyboardInterrupt
def handle_sigterm(signum: int, frame: types.FrameType) -> None:
"""
Handle SIGTERM signals (container stop) by initiating graceful shutdown.
Args:
signum: The signal number (signal.SIGTERM)
frame: Current stack frame (unused but required by signal handler signature)
"""
logger.info("Received SIGTERM (container stop), initiating graceful shutdown...")
raise KeyboardInterrupt
# ==============================================
# BOT STARTUP
# ==============================================
if __name__ == "__main__":
"""
Main entry point for the bot application.
Handles:
- Signal registration for graceful shutdowns (SIGINT/SIGTERM)
- Primary bot execution loop
- Error handling and crash reporting
- Resource cleanup on shutdown
Flow:
1. Initialize signal handlers
2. Start bot with Discord token
3. Handle interrupts and exceptions
4. Execute final cleanup
"""
logger.info("Starting bot initialization")
# Register signal handlers
signal.signal(signal.SIGINT, handle_sigint) # For Ctrl+C
signal.signal(signal.SIGTERM, handle_sigterm) # For container stop commands
logger.info("System signal handlers registered")
try:
bot.run(DISCORD_TOKEN)
except KeyboardInterrupt:
logger.info("Bot shutting down")
except Exception as e:
logger.error(f"Bot crashed: {str(e)}")
sys.exit(1) # Exit with error code for crash
finally:
logger.info("Bot shutdown complete")
sys.exit(0) # Explicit clean exit