All checks were successful
Docker Build and Push (Multi-architecture) / build-and-push (push) Successful in 21s
1440 lines
64 KiB
Python
1440 lines
64 KiB
Python
"""
|
||
Pterodactyl Discord Bot
|
||
A comprehensive Discord bot for managing Pterodactyl game servers through Discord.
|
||
|
||
Features:
|
||
- Server status monitoring with auto-updating embeds
|
||
- Power controls (start/stop/restart)
|
||
- Server address/port display
|
||
- Multi-channel embed support
|
||
- Automatic embed refresh system
|
||
- Role-based access control
|
||
- Single-guild restriction
|
||
- Extensive logging for all operations
|
||
"""
|
||
|
||
import discord
|
||
from discord.ext import commands, tasks
|
||
from discord import app_commands
|
||
import os
|
||
import sys
|
||
import signal
|
||
import types
|
||
import aiohttp
|
||
import asyncio
|
||
import json
|
||
import traceback
|
||
import logging
|
||
from logging.handlers import RotatingFileHandler
|
||
import configparser
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional, Tuple
|
||
from pathlib import Path
|
||
import generate_config
|
||
|
||
# ==============================================
|
||
# LOGGING SETUP
|
||
# ==============================================
|
||
|
||
logs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
|
||
os.makedirs(logs_dir, exist_ok=True)
|
||
|
||
logger = logging.getLogger('pterodisbot')
|
||
logger.setLevel(logging.DEBUG)
|
||
|
||
# File handler for logs (rotates when reaching 5MB, keeps 3 backups)
|
||
handler = RotatingFileHandler(
|
||
filename=os.path.join(logs_dir, 'pterodisbot.log'),
|
||
maxBytes=5*1024*1024, # 5 MiB max log file size
|
||
backupCount=3, # Rotate through 3 files
|
||
encoding='utf-8'
|
||
)
|
||
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
||
logger.addHandler(handler)
|
||
|
||
# Console handler for real-time output
|
||
console_handler = logging.StreamHandler()
|
||
console_handler.setLevel(logging.INFO)
|
||
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
||
logger.addHandler(console_handler)
|
||
|
||
logger.info("Initialized logging system with file and console output")
|
||
|
||
# ==============================================
|
||
# CONFIGURATION SETUP
|
||
# ==============================================
|
||
|
||
# generate_config.generate_config()
|
||
# logger.debug("Gennerated config.ini file using values from .env")
|
||
|
||
config = configparser.ConfigParser()
|
||
config.read('config.ini')
|
||
|
||
# ==============================================
|
||
# CONFIGURATION VALIDATION
|
||
# ==============================================
|
||
|
||
class ConfigValidationError(Exception):
|
||
"""Custom exception for configuration validation errors."""
|
||
pass
|
||
|
||
def validate_config():
|
||
"""
|
||
Validate all required configuration values at startup.
|
||
Raises ConfigValidationError if any required values are missing or invalid.
|
||
"""
|
||
errors = []
|
||
|
||
# Validate Pterodactyl section
|
||
if not config.has_section('Pterodactyl'):
|
||
errors.append("Missing [Pterodactyl] section in config.ini")
|
||
else:
|
||
required_ptero = ['PanelURL', 'ClientAPIKey', 'ApplicationAPIKey']
|
||
for key in required_ptero:
|
||
if not config.get('Pterodactyl', key, fallback=None):
|
||
errors.append(f"Missing required Pterodactyl config value: {key}")
|
||
|
||
# Validate Discord section
|
||
if not config.has_section('Discord'):
|
||
errors.append("Missing [Discord] section in config.ini")
|
||
else:
|
||
required_discord = ['Token', 'AllowedGuildID']
|
||
for key in required_discord:
|
||
if not config.get('Discord', key, fallback=None):
|
||
errors.append(f"Missing required Discord config value: {key}")
|
||
|
||
# Validate AllowedGuildID is a valid integer
|
||
try:
|
||
guild_id = config.getint('Discord', 'AllowedGuildID', fallback=0)
|
||
if guild_id <= 0:
|
||
errors.append("AllowedGuildID must be a positive integer")
|
||
except ValueError:
|
||
errors.append("AllowedGuildID must be a valid integer")
|
||
|
||
# Validate API keys have correct prefixes
|
||
client_key = config.get('Pterodactyl', 'ClientAPIKey', fallback='')
|
||
if client_key and not client_key.startswith('ptlc_'):
|
||
errors.append("ClientAPIKey should start with 'ptlc_'")
|
||
|
||
app_key = config.get('Pterodactyl', 'ApplicationAPIKey', fallback='')
|
||
if app_key and not app_key.startswith('ptla_'):
|
||
errors.append("ApplicationAPIKey should start with 'ptla_'")
|
||
|
||
# Validate PanelURL is a valid URL
|
||
panel_url = config.get('Pterodactyl', 'PanelURL', fallback='')
|
||
if panel_url and not (panel_url.startswith('http://') or panel_url.startswith('https://')):
|
||
errors.append("PanelURL must start with http:// or https://")
|
||
|
||
if errors:
|
||
error_msg = "Configuration validation failed:\n- " + "\n- ".join(errors)
|
||
logger.error(error_msg)
|
||
raise ConfigValidationError(error_msg)
|
||
|
||
logger.info("Configuration validation passed")
|
||
|
||
# ==============================================
|
||
# CONSTANTS (Updated with validation)
|
||
# ==============================================
|
||
|
||
try:
|
||
validate_config()
|
||
|
||
PTERODACTYL_URL = config.get('Pterodactyl', 'PanelURL')
|
||
PTERODACTYL_CLIENT_API_KEY = config.get('Pterodactyl', 'ClientAPIKey')
|
||
PTERODACTYL_APPLICATION_API_KEY = config.get('Pterodactyl', 'ApplicationAPIKey')
|
||
DISCORD_TOKEN = config.get('Discord', 'Token')
|
||
ALLOWED_GUILD_ID = config.getint('Discord', 'AllowedGuildID')
|
||
REQUIRED_ROLE = "Game Server User"
|
||
UPDATE_INTERVAL = 10
|
||
EMBED_LOCATIONS_FILE = "./embed/embed_locations.json"
|
||
|
||
logger.debug("Loaded and validated configuration values from config.ini")
|
||
|
||
except ConfigValidationError as e:
|
||
logger.critical(f"Configuration error: {str(e)}")
|
||
raise
|
||
except Exception as e:
|
||
logger.critical(f"Unexpected error loading configuration: {str(e)}")
|
||
raise
|
||
|
||
# ==============================================
|
||
# PTERODACTYL API CLASS
|
||
# ==============================================
|
||
|
||
class PterodactylAPI:
|
||
"""
|
||
Handles all interactions with the Pterodactyl Panel API.
|
||
Uses client API key for client endpoints and application API key for admin endpoints.
|
||
Provides methods for server management and monitoring.
|
||
"""
|
||
|
||
def __init__(self, panel_url: str, client_api_key: str, application_api_key: str):
|
||
"""
|
||
Initialize the Pterodactyl API client with both API keys.
|
||
|
||
Args:
|
||
panel_url: URL of the Pterodactyl panel (must include protocol)
|
||
client_api_key: API key for client endpoints (starts with ptlc_)
|
||
application_api_key: API key for application endpoints (starts with ptla_)
|
||
"""
|
||
self.panel_url = panel_url.rstrip('/')
|
||
self.client_api_key = client_api_key
|
||
self.application_api_key = application_api_key
|
||
self.session = None
|
||
self.lock = asyncio.Lock() # Prevents concurrent API access
|
||
logger.info("Initialized PterodactylAPI client with provided credentials")
|
||
|
||
async def initialize(self):
|
||
"""Initialize the aiohttp client session for API requests."""
|
||
self.session = aiohttp.ClientSession()
|
||
logger.debug("Created new aiohttp ClientSession")
|
||
|
||
async def close(self):
|
||
"""Cleanly close the aiohttp session when shutting down."""
|
||
if self.session and not self.session.closed:
|
||
await self.session.close()
|
||
logger.debug("Closed aiohttp ClientSession")
|
||
|
||
async def _request(self, method: str, endpoint: str, data: Optional[dict] = None, use_application_key: bool = False) -> dict:
|
||
"""
|
||
Make an authenticated request to the Pterodactyl API.
|
||
|
||
Args:
|
||
method: HTTP method (GET, POST, PUT, DELETE, etc.)
|
||
endpoint: API endpoint (e.g., 'application/servers')
|
||
data: Optional JSON payload for POST/PUT requests
|
||
use_application_key: Whether to use the application API key (admin endpoints)
|
||
|
||
Returns:
|
||
Dictionary containing API response or error information
|
||
|
||
Raises:
|
||
aiohttp.ClientError: For network-related issues
|
||
json.JSONDecodeError: If response cannot be parsed as JSON
|
||
"""
|
||
url = f"{self.panel_url}/api/{endpoint}"
|
||
api_key_type = "Application" if use_application_key else "Client"
|
||
logger.debug(f"Preparing {method} request to {endpoint} using {api_key_type} API key")
|
||
|
||
# Choose the appropriate API key
|
||
api_key = self.application_api_key if use_application_key else self.client_api_key
|
||
headers = {
|
||
"Authorization": f"Bearer {api_key}",
|
||
"Accept": "application/json",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
try:
|
||
async with self.lock:
|
||
logger.debug(f"Acquired lock for API request to {endpoint}")
|
||
async with self.session.request(
|
||
method,
|
||
url,
|
||
headers=headers,
|
||
json=data
|
||
) as response:
|
||
if response.status == 204: # No content
|
||
logger.debug(f"Received 204 No Content response from {endpoint}")
|
||
return {"status": "success"}
|
||
|
||
response_data = await response.json()
|
||
logger.debug(f"Received response from {endpoint} with status {response.status}")
|
||
|
||
if response.status >= 400:
|
||
error_msg = response_data.get('errors', [{}])[0].get('detail', 'Unknown error')
|
||
logger.error(f"API request to {endpoint} failed with status {response.status}: {error_msg}")
|
||
return {"status": "error", "message": error_msg}
|
||
|
||
return response_data
|
||
except Exception as e:
|
||
logger.error(f"Exception during API request to {endpoint}: {str(e)}")
|
||
return {"status": "error", "message": str(e)}
|
||
|
||
async def get_servers(self) -> List[dict]:
|
||
"""
|
||
Get a list of all servers from the Pterodactyl panel.
|
||
Uses application API key as this is an admin endpoint.
|
||
|
||
Returns:
|
||
List of server dictionaries containing all server attributes
|
||
"""
|
||
logger.info("Fetching list of all servers from Pterodactyl panel")
|
||
response = await self._request("GET", "application/servers", use_application_key=True)
|
||
servers = response.get('data', [])
|
||
logger.info(f"Retrieved {len(servers)} servers from Pterodactyl panel")
|
||
return servers
|
||
|
||
async def get_server_resources(self, server_id: str) -> dict:
|
||
"""
|
||
Get resource usage for a specific server.
|
||
Uses client API key as this is a client endpoint.
|
||
|
||
Args:
|
||
server_id: The Pterodactyl server identifier
|
||
|
||
Returns:
|
||
Dictionary containing server resource usage and current state
|
||
"""
|
||
logger.debug(f"Fetching resource usage for server {server_id}")
|
||
try:
|
||
response = await self._request("GET", f"client/servers/{server_id}/resources")
|
||
if response.get('status') == 'error':
|
||
error_msg = response.get('message', 'Unknown error')
|
||
logger.error(f"Failed to get resources for server {server_id}: {error_msg}")
|
||
return {'attributes': {'current_state': 'offline'}}
|
||
|
||
state = response.get('attributes', {}).get('current_state', 'unknown')
|
||
logger.debug(f"Server {server_id} current state: {state}")
|
||
return response
|
||
except Exception as e:
|
||
logger.error(f"Exception getting resources for server {server_id}: {str(e)}")
|
||
return {'attributes': {'current_state': 'offline'}}
|
||
|
||
async def send_power_action(self, server_id: str, action: str) -> dict:
|
||
"""
|
||
Send a power action to a server (start/stop/restart).
|
||
Uses client API key as this is a client endpoint.
|
||
|
||
Args:
|
||
server_id: The Pterodactyl server identifier
|
||
action: Power action to send (start/stop/restart)
|
||
|
||
Returns:
|
||
Dictionary containing API response status
|
||
"""
|
||
valid_actions = ['start', 'stop', 'restart']
|
||
if action not in valid_actions:
|
||
logger.warning(f"Invalid power action attempted: {action}")
|
||
return {"status": "error", "message": f"Invalid action. Must be one of: {', '.join(valid_actions)}"}
|
||
|
||
logger.info(f"Sending {action} command to server {server_id}")
|
||
result = await self._request("POST", f"client/servers/{server_id}/power", {"signal": action})
|
||
|
||
if result.get('status') == 'success':
|
||
logger.info(f"Successfully executed {action} on server {server_id}")
|
||
else:
|
||
logger.error(f"Failed to execute {action} on server {server_id}: {result.get('message', 'Unknown error')}")
|
||
|
||
return result
|
||
|
||
async def get_server_details(self, server_id: str) -> dict:
|
||
"""
|
||
Get detailed server information including allocations.
|
||
Uses application API key as this is an admin endpoint.
|
||
|
||
Args:
|
||
server_id: The Pterodactyl server identifier
|
||
|
||
Returns:
|
||
Dictionary containing detailed server information
|
||
"""
|
||
logger.debug(f"Fetching detailed information for server {server_id}")
|
||
return await self._request("GET", f"application/servers/{server_id}", use_application_key=True)
|
||
|
||
async def get_server_allocations(self, server_id: str) -> dict:
|
||
"""
|
||
Get allocation information for a server (IP addresses and ports).
|
||
Uses application API key as this is an admin endpoint.
|
||
|
||
Args:
|
||
server_id: The Pterodactyl server identifier
|
||
|
||
Returns:
|
||
Dictionary containing server allocation information
|
||
"""
|
||
logger.debug(f"Fetching allocation information for server {server_id}")
|
||
return await self._request("GET", f"application/servers/{server_id}/allocations", use_application_key=True)
|
||
|
||
# ==============================================
|
||
# SERVER STATUS VIEW CLASS (Buttons and UI)
|
||
# ==============================================
|
||
|
||
class ServerStatusView(discord.ui.View):
|
||
"""
|
||
Interactive Discord view containing server control buttons.
|
||
Provides persistent controls for server management with role-based access.
|
||
"""
|
||
|
||
def __init__(self, server_id: str, server_name: str, pterodactyl_api: PterodactylAPI, server_data: dict):
|
||
"""
|
||
Initialize the server status view with control buttons.
|
||
|
||
Args:
|
||
server_id: The server's Pterodactyl identifier
|
||
server_name: Human-readable server name
|
||
pterodactyl_api: API client instance
|
||
server_data: Full server data from Pterodactyl
|
||
"""
|
||
super().__init__(timeout=None) # Persistent view
|
||
self.server_id = server_id
|
||
self.server_name = server_name
|
||
self.api = pterodactyl_api
|
||
self.server_data = server_data
|
||
logger.debug(f"Created ServerStatusView for {server_name} ({server_id})")
|
||
|
||
async def interaction_check(self, interaction: discord.Interaction) -> bool:
|
||
"""
|
||
Verify the interacting user has the required role and is in the allowed guild.
|
||
|
||
Args:
|
||
interaction: Discord interaction object
|
||
|
||
Returns:
|
||
bool: True if authorized, False otherwise
|
||
"""
|
||
# First check if interaction is from the allowed guild
|
||
if interaction.guild_id != ALLOWED_GUILD_ID:
|
||
logger.warning(f"Unauthorized interaction attempt from guild {interaction.guild_id}")
|
||
await interaction.response.send_message(
|
||
"This bot is only available in a specific server.",
|
||
ephemeral=True
|
||
)
|
||
return False
|
||
|
||
# Then check for required role
|
||
logger.debug(f"Checking permissions for {interaction.user.name} on server {self.server_name}")
|
||
has_role = any(role.name == REQUIRED_ROLE for role in interaction.user.roles)
|
||
if not has_role:
|
||
logger.warning(f"Permission denied for {interaction.user.name} - missing '{REQUIRED_ROLE}' role")
|
||
await interaction.response.send_message(
|
||
f"You don't have permission to control servers. You need the '{REQUIRED_ROLE}' role.",
|
||
ephemeral=True
|
||
)
|
||
return False
|
||
|
||
logger.debug(f"Permission granted for {interaction.user.name}")
|
||
return True
|
||
|
||
async def on_error(self, interaction: discord.Interaction, error: Exception, item: discord.ui.Item):
|
||
"""
|
||
Handle errors in button interactions.
|
||
|
||
Args:
|
||
interaction: Discord interaction object
|
||
error: Exception that occurred
|
||
item: The UI item that triggered the error
|
||
"""
|
||
logger.error(f"View error in {self.server_name} by {interaction.user.name}: {str(error)}")
|
||
await interaction.response.send_message(
|
||
"An error occurred while processing your request.",
|
||
ephemeral=True
|
||
)
|
||
|
||
@discord.ui.button(label="Start", style=discord.ButtonStyle.green, custom_id="start_button")
|
||
async def start_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||
"""Send a start command to the server."""
|
||
logger.info(f"Start button pressed for {self.server_name} by {interaction.user.name}")
|
||
await interaction.response.defer(ephemeral=True)
|
||
result = await self.api.send_power_action(self.server_id, "start")
|
||
|
||
if result.get('status') == 'success':
|
||
message = f"Server '{self.server_name}' is starting..."
|
||
logger.info(f"Successfully started server {self.server_name}")
|
||
else:
|
||
message = f"Failed to start server: {result.get('message', 'Unknown error')}"
|
||
logger.error(f"Failed to start server {self.server_name}: {message}")
|
||
|
||
await interaction.followup.send(message, ephemeral=True)
|
||
|
||
@discord.ui.button(label="Stop", style=discord.ButtonStyle.red, custom_id="stop_button")
|
||
async def stop_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||
"""Send a stop command to the server."""
|
||
logger.info(f"Stop button pressed for {self.server_name} by {interaction.user.name}")
|
||
await interaction.response.defer(ephemeral=True)
|
||
result = await self.api.send_power_action(self.server_id, "stop")
|
||
|
||
if result.get('status') == 'success':
|
||
message = f"Server '{self.server_name}' is stopping..."
|
||
logger.info(f"Successfully stopped server {self.server_name}")
|
||
else:
|
||
message = f"Failed to stop server: {result.get('message', 'Unknown error')}"
|
||
logger.error(f"Failed to stop server {self.server_name}: {message}")
|
||
|
||
await interaction.followup.send(message, ephemeral=True)
|
||
|
||
@discord.ui.button(label="Restart", style=discord.ButtonStyle.blurple, custom_id="restart_button")
|
||
async def restart_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||
"""Send a restart command to the server."""
|
||
logger.info(f"Restart button pressed for {self.server_name} by {interaction.user.name}")
|
||
await interaction.response.defer(ephemeral=True)
|
||
result = await self.api.send_power_action(self.server_id, "restart")
|
||
|
||
if result.get('status') == 'success':
|
||
message = f"Server '{self.server_name}' is restarting..."
|
||
logger.info(f"Successfully restarted server {self.server_name}")
|
||
else:
|
||
message = f"Failed to restart server: {result.get('message', 'Unknown error')}"
|
||
logger.error(f"Failed to restart server {self.server_name}: {message}")
|
||
|
||
await interaction.followup.send(message, ephemeral=True)
|
||
|
||
@discord.ui.button(label="Show Address", style=discord.ButtonStyle.grey, custom_id="show_address_button")
|
||
async def show_address_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
||
"""Show server's default allocation IP and port using client API."""
|
||
logger.info(f"Show Address button pressed for {self.server_name} by {interaction.user.name}")
|
||
try:
|
||
await interaction.response.defer(ephemeral=True)
|
||
logger.debug(f"Fetching server details for {self.server_id}")
|
||
|
||
# Get server details using client API
|
||
server_details = await self.api._request(
|
||
"GET",
|
||
f"client/servers/{self.server_id}",
|
||
use_application_key=False
|
||
)
|
||
|
||
if server_details.get('status') == 'error':
|
||
error_msg = server_details.get('message', 'Unknown error')
|
||
logger.error(f"Failed to get server details for {self.server_id}: {error_msg}")
|
||
raise ValueError(error_msg)
|
||
|
||
attributes = server_details.get('attributes', {})
|
||
relationships = attributes.get('relationships', {})
|
||
allocations = relationships.get('allocations', {}).get('data', [])
|
||
|
||
if not allocations:
|
||
logger.warning(f"No allocations found for server {self.server_id}")
|
||
raise ValueError("No allocations found for this server")
|
||
|
||
# Find the default allocation (is_default=True)
|
||
default_allocation = next(
|
||
(alloc for alloc in allocations
|
||
if alloc.get('attributes', {}).get('is_default', False)),
|
||
allocations[0] # Fallback to first allocation if no default found
|
||
)
|
||
|
||
allocation_attrs = default_allocation.get('attributes', {})
|
||
ip_alias = allocation_attrs.get('ip_alias', 'Unknown')
|
||
port = str(allocation_attrs.get('port', 'Unknown'))
|
||
|
||
logger.debug(f"Retrieved connection info for {self.server_id}: {ip_alias}:{port}")
|
||
|
||
# Create and send embed
|
||
embed = discord.Embed(
|
||
title=f"{self.server_name} Connection Info",
|
||
color=discord.Color.blue(),
|
||
description=f"Server ID: `{self.server_id}`"
|
||
)
|
||
embed.add_field(name="Address", value=f"`{ip_alias}`", inline=True)
|
||
embed.add_field(name="Port", value=f"`{port}`", inline=True)
|
||
|
||
await interaction.followup.send(embed=embed, ephemeral=True)
|
||
logger.info(f"Displayed connection info for {self.server_name}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to show address for {self.server_name}: {str(e)}")
|
||
await interaction.followup.send(
|
||
"⚠️ Failed to get connection info. The server may not have any ports allocated.",
|
||
ephemeral=True
|
||
)
|
||
|
||
# ==============================================
|
||
# MAIN BOT CLASS
|
||
# ==============================================
|
||
|
||
class PterodactylBot(commands.Bot):
|
||
"""
|
||
Main bot class for Pterodactyl server management.
|
||
Handles Discord interactions, embed management, and background tasks.
|
||
Manages server status embeds and user commands.
|
||
"""
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
"""
|
||
Initialize the Pterodactyl bot instance.
|
||
Sets up caches, locks, and storage paths.
|
||
"""
|
||
super().__init__(*args, **kwargs)
|
||
self.pterodactyl_api = None # Pterodactyl API client
|
||
self.server_cache: Dict[str, dict] = {} # Cache of server data from Pterodactyl
|
||
self.embed_locations: Dict[str, Dict[str, int]] = {} # Tracks where embeds are posted
|
||
self.update_lock = asyncio.Lock() # Prevents concurrent updates
|
||
self.embed_storage_path = Path(EMBED_LOCATIONS_FILE) # File to store embed locations
|
||
# Track previous server states and CPU usage to detect changes
|
||
# Format: {server_id: (state, cpu_usage)}
|
||
self.previous_states: Dict[str, Tuple[str, float]] = {}
|
||
logger.info("Initialized PterodactylBot instance with state tracking")
|
||
|
||
async def setup_hook(self):
|
||
"""
|
||
Bot setup routine called when the bot is starting.
|
||
Initializes API client, loads saved data, and starts background tasks.
|
||
"""
|
||
logger.info("Running bot setup hook")
|
||
|
||
# Initialize API client
|
||
self.pterodactyl_api = PterodactylAPI(
|
||
PTERODACTYL_URL,
|
||
PTERODACTYL_CLIENT_API_KEY,
|
||
PTERODACTYL_APPLICATION_API_KEY
|
||
)
|
||
await self.pterodactyl_api.initialize()
|
||
logger.info("Initialized Pterodactyl API client")
|
||
|
||
# Load saved embed locations
|
||
await self.load_embed_locations()
|
||
|
||
# Start background update task
|
||
self.update_status.start()
|
||
logger.info("Started background status update task")
|
||
|
||
async def load_embed_locations(self):
|
||
"""Load saved embed locations from JSON storage file."""
|
||
logger.debug("Attempting to load embed locations from storage")
|
||
if not self.embed_storage_path.exists():
|
||
logger.info("No existing embed locations file found")
|
||
return
|
||
|
||
try:
|
||
with open(self.embed_storage_path, 'r') as f:
|
||
self.embed_locations = json.load(f)
|
||
logger.info(f"Loaded {len(self.embed_locations)} embed locations from storage")
|
||
except Exception as e:
|
||
logger.error(f"Failed to load embed locations: {str(e)}")
|
||
|
||
async def save_embed_locations(self):
|
||
"""Save current embed locations to JSON storage file."""
|
||
logger.debug("Attempting to save embed locations to storage")
|
||
try:
|
||
with open(self.embed_storage_path, 'w') as f:
|
||
json.dump(self.embed_locations, f, indent=2)
|
||
logger.debug("Successfully saved embed locations to disk")
|
||
except Exception as e:
|
||
logger.error(f"Failed to save embed locations: {str(e)}")
|
||
|
||
async def refresh_all_embeds(self) -> Tuple[int, int]:
|
||
"""
|
||
Perform a complete refresh of all server status embeds.
|
||
Creates new embeds and deletes old ones to prevent duplication.
|
||
|
||
Returns:
|
||
Tuple of (deleted_count, created_count) - number of embeds processed
|
||
"""
|
||
logger.info("Starting full refresh of all server embeds")
|
||
async with self.update_lock:
|
||
try:
|
||
await asyncio.sleep(1) # Initial delay
|
||
|
||
# Get current server list if cache is empty
|
||
if not self.server_cache:
|
||
logger.debug("Server cache empty, fetching fresh server list")
|
||
servers = await self.pterodactyl_api.get_servers()
|
||
if not servers:
|
||
logger.warning("No servers found in Pterodactyl panel")
|
||
return 0, 0
|
||
|
||
self.server_cache = {server['attributes']['identifier']: server for server in servers}
|
||
logger.info(f"Populated server cache with {len(servers)} servers")
|
||
|
||
# Create new embeds in temporary storage
|
||
new_embeds = {}
|
||
created_count = 0
|
||
skipped_count = 0
|
||
|
||
for server_id, server_data in self.server_cache.items():
|
||
# Skip if we don't have an existing location to recreate in
|
||
if server_id not in self.embed_locations:
|
||
skipped_count += 1
|
||
continue
|
||
|
||
channel_id = self.embed_locations[server_id]['channel_id']
|
||
channel = self.get_channel(int(channel_id))
|
||
if not channel:
|
||
logger.warning(f"Channel {channel_id} not found for server {server_id}")
|
||
continue
|
||
|
||
try:
|
||
logger.debug(f"Creating new embed for server {server_id}")
|
||
# Get current server status
|
||
resources = await self.pterodactyl_api.get_server_resources(server_id)
|
||
|
||
# Create new embed
|
||
embed, view = await self.get_server_status_embed(server_data, resources)
|
||
message = await channel.send(embed=embed, view=view)
|
||
|
||
# Store in temporary location
|
||
new_embeds[server_id] = {
|
||
'channel_id': str(channel.id),
|
||
'message_id': str(message.id)
|
||
}
|
||
created_count += 1
|
||
logger.info(f"Created new embed for server {server_data['attributes']['name']}")
|
||
|
||
await asyncio.sleep(1) # Rate limit protection
|
||
except Exception as e:
|
||
logger.error(f"Failed to create new embed for server {server_id}: {str(e)}")
|
||
|
||
logger.info(f"Created {created_count} new embeds, skipped {skipped_count} servers")
|
||
|
||
# Only proceed if we created at least one new embed
|
||
if not new_embeds:
|
||
logger.warning("No new embeds created during refresh")
|
||
return 0, 0
|
||
|
||
# Now delete old embeds
|
||
deleted_count = 0
|
||
not_found_count = 0
|
||
|
||
for server_id, location in list(self.embed_locations.items()):
|
||
try:
|
||
channel = self.get_channel(int(location['channel_id']))
|
||
if channel:
|
||
try:
|
||
message = await channel.fetch_message(int(location['message_id']))
|
||
await message.delete()
|
||
deleted_count += 1
|
||
logger.debug(f"Deleted old embed for server {server_id}")
|
||
await asyncio.sleep(0.5) # Rate limit protection
|
||
except discord.NotFound:
|
||
not_found_count += 1
|
||
logger.debug(f"Old embed for server {server_id} already deleted")
|
||
except Exception as e:
|
||
logger.error(f"Failed to delete old embed for server {server_id}: {str(e)}")
|
||
except Exception as e:
|
||
logger.error(f"Error processing old embed for server {server_id}: {str(e)}")
|
||
|
||
logger.info(f"Deleted {deleted_count} old embeds, {not_found_count} already missing")
|
||
|
||
# Update storage with new embed locations
|
||
self.embed_locations = new_embeds
|
||
await self.save_embed_locations()
|
||
|
||
return deleted_count, created_count
|
||
|
||
except Exception as e:
|
||
logger.error(f"Critical error during embed refresh: {str(e)}")
|
||
raise
|
||
|
||
async def track_new_embed(self, server_id: str, message: discord.Message):
|
||
"""
|
||
Track a newly created embed in storage.
|
||
|
||
Args:
|
||
server_id: The server's Pterodactyl identifier
|
||
message: Discord message containing the embed
|
||
"""
|
||
logger.debug(f"Tracking new embed for server {server_id} in channel {message.channel.id}")
|
||
self.embed_locations[server_id] = {
|
||
'channel_id': str(message.channel.id),
|
||
'message_id': str(message.id)
|
||
}
|
||
await self.save_embed_locations()
|
||
|
||
async def get_server_status_embed(self, server_data: dict, resources: dict) -> Tuple[discord.Embed, ServerStatusView]:
|
||
"""
|
||
Create a status embed and view for a server.
|
||
|
||
Args:
|
||
server_data: Server information from Pterodactyl
|
||
resources: Current resource usage data
|
||
|
||
Returns:
|
||
Tuple of (embed, view) objects ready for display
|
||
"""
|
||
attributes = server_data.get('attributes', {})
|
||
identifier = attributes.get('identifier', 'unknown')
|
||
name = attributes.get('name', 'Unknown Server')
|
||
description = attributes.get('description', 'No description available')
|
||
logger.debug(f"Building status embed for server {name} ({identifier})")
|
||
|
||
# Parse resource data
|
||
resource_attributes = resources.get('attributes', {})
|
||
current_state = resource_attributes.get('current_state', 'offline').title()
|
||
is_suspended = attributes.get('suspended', False)
|
||
|
||
# Create embed with appropriate color based on status
|
||
embed = discord.Embed(
|
||
title=f"{name} - {current_state}",
|
||
description=description,
|
||
color=discord.Color.blue() if current_state.lower() == "running" else discord.Color.red(),
|
||
timestamp=datetime.now()
|
||
)
|
||
|
||
embed.add_field(name="Server ID", value=identifier, inline=True)
|
||
|
||
if is_suspended:
|
||
embed.add_field(name="Status", value="⛔ Suspended", inline=True)
|
||
else:
|
||
embed.add_field(name="Status", value="✅ Active", inline=True)
|
||
|
||
# Add resource usage if server is running
|
||
if current_state.lower() == "running":
|
||
cpu_usage = round(resource_attributes.get('resources', {}).get('cpu_absolute', 0), 2)
|
||
memory_usage = round(resource_attributes.get('resources', {}).get('memory_bytes', 0) / (1024 ** 2), 2)
|
||
disk_usage = round(resource_attributes.get('resources', {}).get('disk_bytes', 0) / (1024 ** 2), 2)
|
||
network_rx = round(resource_attributes.get('resources', {}).get('network_rx_bytes', 0) / (1024 ** 2), 2)
|
||
network_tx = round(resource_attributes.get('resources', {}).get('network_tx_bytes', 0) / (1024 ** 2), 2)
|
||
|
||
embed.add_field(name="CPU Usage", value=f"{cpu_usage}%", inline=True)
|
||
embed.add_field(name="Memory Usage", value=f"{memory_usage} MB", inline=True)
|
||
embed.add_field(name="Disk Usage", value=f"{disk_usage} MB", inline=True)
|
||
embed.add_field(name="Network", value=f"⬇️ {network_rx} MB / ⬆️ {network_tx} MB", inline=False)
|
||
|
||
embed.set_footer(text="Last updated")
|
||
|
||
# Create interactive view with control buttons
|
||
view = ServerStatusView(
|
||
server_id=identifier,
|
||
server_name=name,
|
||
pterodactyl_api=self.pterodactyl_api,
|
||
server_data=server_data
|
||
)
|
||
|
||
logger.debug(f"Successfully built status components for {name}")
|
||
return embed, view
|
||
|
||
@tasks.loop(seconds=UPDATE_INTERVAL)
|
||
async def update_status(self):
|
||
"""
|
||
Background task to update server status embeds when:
|
||
1. Server power state changes (started/stopped/restarted)
|
||
2. Significant CPU usage change (>50% difference)
|
||
3. First time seeing the server
|
||
|
||
This minimizes API calls to Discord and updates while maintaining
|
||
real-time awareness of important server changes.
|
||
"""
|
||
logger.info("Starting optimized server status update cycle")
|
||
async with self.update_lock: # Ensure only one update runs at a time
|
||
try:
|
||
# Fetch current server list from Pterodactyl
|
||
servers = await self.pterodactyl_api.get_servers()
|
||
if not servers:
|
||
logger.warning("No servers found in Pterodactyl panel during update")
|
||
return
|
||
|
||
# Update our local cache with fresh server data
|
||
self.server_cache = {server['attributes']['identifier']: server for server in servers}
|
||
logger.debug(f"Updated server cache with {len(servers)} servers")
|
||
|
||
# Variables to track our update statistics
|
||
update_count = 0 # Successful updates
|
||
error_count = 0 # Failed updates
|
||
missing_count = 0 # Missing embeds
|
||
skipped_count = 0 # Servers that didn't need updates
|
||
|
||
# Process each server we're tracking embeds for
|
||
for server_id, location in list(self.embed_locations.items()):
|
||
# Skip if server no longer exists in Pterodactyl
|
||
if server_id not in self.server_cache:
|
||
logger.warning(f"Server {server_id} not found in cache, skipping update")
|
||
continue
|
||
|
||
server_data = self.server_cache[server_id]
|
||
server_name = server_data['attributes']['name']
|
||
|
||
try:
|
||
logger.debug(f"Checking status for server {server_name} ({server_id})")
|
||
|
||
# Get current server resource usage
|
||
resources = await self.pterodactyl_api.get_server_resources(server_id)
|
||
current_state = resources.get('attributes', {}).get('current_state', 'offline')
|
||
cpu_usage = round(resources.get('attributes', {}).get('resources', {}).get('cpu_absolute', 0), 2)
|
||
|
||
# Retrieve previous recorded state and CPU usage
|
||
prev_state, prev_cpu = self.previous_states.get(server_id, (None, 0))
|
||
|
||
# DECISION LOGIC: Should we update the embed?
|
||
needs_update = False
|
||
|
||
# 1. Check if power state changed (most important)
|
||
if current_state != prev_state:
|
||
logger.debug(f"Power state changed for {server_name}: {prev_state} -> {current_state}")
|
||
needs_update = True
|
||
|
||
# 2. Check for significant CPU change (only if server is running)
|
||
elif current_state == 'running' and abs(cpu_usage - prev_cpu) > 50:
|
||
logger.debug(f"Significant CPU change for {server_name}: {prev_cpu}% -> {cpu_usage}%")
|
||
needs_update = True
|
||
|
||
# 3. First time we're seeing this server (initial update)
|
||
elif prev_state is None:
|
||
logger.debug(f"First check for {server_name}, performing initial update")
|
||
needs_update = True
|
||
|
||
# PERFORM UPDATE IF NEEDED
|
||
if needs_update:
|
||
# Generate fresh embed and view components
|
||
embed, view = await self.get_server_status_embed(server_data, resources)
|
||
|
||
# Get the channel where this server's embed lives
|
||
channel = self.get_channel(int(location['channel_id']))
|
||
if not channel:
|
||
logger.warning(f"Channel {location['channel_id']} not found for server {server_id}")
|
||
continue
|
||
|
||
# Fetch and update the existing message
|
||
message = await channel.fetch_message(int(location['message_id']))
|
||
await message.edit(embed=embed, view=view)
|
||
update_count += 1
|
||
logger.debug(f"Updated status for {server_name}")
|
||
|
||
# Update our state tracking with new values
|
||
self.previous_states[server_id] = (current_state, cpu_usage)
|
||
else:
|
||
# No significant changes detected
|
||
skipped_count += 1
|
||
logger.debug(f"No changes detected for {server_name}, skipping update")
|
||
|
||
except discord.NotFound:
|
||
# Embed message was deleted - clean up our tracking
|
||
logger.warning(f"Embed for server {server_id} not found, removing from tracking")
|
||
self.embed_locations.pop(server_id, None)
|
||
self.previous_states.pop(server_id, None) # Also clean up state tracking
|
||
missing_count += 1
|
||
await self.save_embed_locations()
|
||
except Exception as e:
|
||
logger.error(f"Failed to update status for server {server_id}: {str(e)}")
|
||
error_count += 1
|
||
|
||
# Small delay between servers to avoid rate limits
|
||
await asyncio.sleep(0.5)
|
||
|
||
# Log summary of this update cycle
|
||
logger.info(
|
||
f"Update cycle complete: "
|
||
f"{update_count} updated, "
|
||
f"{skipped_count} skipped, "
|
||
f"{missing_count} missing, "
|
||
f"{error_count} errors"
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in update_status task: {str(e)}")
|
||
# If something went wrong, wait before retrying
|
||
await asyncio.sleep(5)
|
||
|
||
@update_status.before_loop
|
||
async def before_update_status(self):
|
||
"""Wait for bot to be ready before starting update task."""
|
||
logger.debug("Waiting for bot readiness before starting update task")
|
||
await self.wait_until_ready()
|
||
await self.refresh_all_embeds()
|
||
|
||
@update_status.after_loop
|
||
async def after_update_status(self):
|
||
"""Handle update task stopping."""
|
||
if self.update_status.is_being_cancelled():
|
||
logger.info("Server status update task was cancelled")
|
||
elif self.update_status.failed():
|
||
logger.error("Server status update task failed")
|
||
|
||
async def close(self):
|
||
"""Cleanup when bot is shutting down."""
|
||
logger.info("Bot shutdown initiated - performing cleanup")
|
||
await self.save_embed_locations()
|
||
if self.pterodactyl_api:
|
||
await self.pterodactyl_api.close()
|
||
await super().close()
|
||
|
||
# ==============================================
|
||
# DISCORD COMMANDS
|
||
# ==============================================
|
||
|
||
intents = discord.Intents.default()
|
||
intents.message_content = True
|
||
|
||
bot = PterodactylBot(command_prefix="!", intents=intents)
|
||
|
||
async def check_allowed_guild(interaction: discord.Interaction) -> bool:
|
||
"""
|
||
Verify that an interaction is coming from the allowed guild.
|
||
|
||
Args:
|
||
interaction: Discord interaction object
|
||
|
||
Returns:
|
||
bool: True if interaction is allowed, False otherwise
|
||
"""
|
||
if interaction.guild_id != ALLOWED_GUILD_ID:
|
||
logger.warning(f"Command attempted from unauthorized guild {interaction.guild_id} by {interaction.user.name}")
|
||
await interaction.response.send_message(
|
||
"This bot is only available in a specific server.",
|
||
ephemeral=True
|
||
)
|
||
return False
|
||
return True
|
||
|
||
@bot.tree.command(name="server_status", description="Get a list of available game servers to control")
|
||
async def server_status(interaction: discord.Interaction):
|
||
"""
|
||
Slash command to display server status dashboard with interactive dropdown selection.
|
||
|
||
This command provides a comprehensive server management interface by:
|
||
1. Fetching current server list from Pterodactyl panel
|
||
2. Generating real-time statistics (online/offline counts)
|
||
3. Displaying an informational embed with server statistics
|
||
4. Presenting an ephemeral dropdown menu with all available servers
|
||
5. Handling server selection to create permanent status embeds in the channel
|
||
|
||
Workflow:
|
||
- Validates guild permissions and defers ephemeral response
|
||
- Refreshes server cache from Pterodactyl API
|
||
- Calculates online/offline statistics by checking each server's state
|
||
- Creates statistics embed with visual server status breakdown
|
||
- Generates dropdown menu with all servers (name + description)
|
||
- Handles user selection via ephemeral dropdown interaction
|
||
- Creates permanent status embed in channel upon selection
|
||
- Manages embed tracking and cleanup of previous embeds
|
||
|
||
Ephemeral Design:
|
||
- Initial dashboard and dropdown are ephemeral (visible only to user)
|
||
- Automatically disappears after use or timeout (3 minutes)
|
||
- No manual cleanup required for dropdown interface
|
||
- Only final server status embed is posted publicly
|
||
|
||
Error Handling:
|
||
- Handles API failures during server enumeration
|
||
- Manages missing servers between selection and execution
|
||
- Provides user-friendly error messages for all failure scenarios
|
||
- Maintains comprehensive logging for troubleshooting
|
||
|
||
Args:
|
||
interaction: Discord interaction object representing the command invocation
|
||
|
||
Returns:
|
||
None: Sends ephemeral dashboard with dropdown, then public status embed on selection
|
||
"""
|
||
# Check if interaction is from allowed guild
|
||
if not await check_allowed_guild(interaction):
|
||
return
|
||
|
||
logger.info(f"Server status command invoked by {interaction.user.name}")
|
||
await interaction.response.defer(ephemeral=True)
|
||
|
||
try:
|
||
# Refresh server cache with current data from Pterodactyl panel
|
||
servers = await bot.pterodactyl_api.get_servers()
|
||
if not servers:
|
||
logger.warning("No servers found in Pterodactyl panel")
|
||
await interaction.followup.send("No servers found in the Pterodactyl panel.", ephemeral=True)
|
||
return
|
||
|
||
bot.server_cache = {server['attributes']['identifier']: server for server in servers}
|
||
logger.debug(f"Refreshed server cache with {len(servers)} servers")
|
||
|
||
# Count online/offline servers by checking each server's current state
|
||
online_count = 0
|
||
offline_count = 0
|
||
|
||
# Check status for each server to generate accurate statistics
|
||
for server_id, server_data in bot.server_cache.items():
|
||
resources = await bot.pterodactyl_api.get_server_resources(server_id)
|
||
current_state = resources.get('attributes', {}).get('current_state', 'offline')
|
||
|
||
if current_state == 'running':
|
||
online_count += 1
|
||
else:
|
||
offline_count += 1
|
||
|
||
# Create statistics embed with visual server status breakdown
|
||
stats_embed = discord.Embed(
|
||
title="🏗️ Server Status Dashboard",
|
||
description="Select a server from the dropdown below to view its detailed status and controls.",
|
||
color=discord.Color.blue(),
|
||
timestamp=datetime.now()
|
||
)
|
||
|
||
stats_embed.add_field(
|
||
name="📊 Server Statistics",
|
||
value=f"**Total Servers:** {len(servers)}\n"
|
||
f"✅ **Online:** {online_count}\n"
|
||
f"❌ **Offline:** {offline_count}",
|
||
inline=False
|
||
)
|
||
|
||
stats_embed.add_field(
|
||
name="ℹ️ How to Use",
|
||
value="Use the dropdown menu below to select a server. The server's status embed will be posted in this channel.",
|
||
inline=False
|
||
)
|
||
|
||
stats_embed.set_footer(text="Server status will update automatically")
|
||
|
||
# Create dropdown menu options from available servers
|
||
server_options = []
|
||
for server_id, server_data in bot.server_cache.items():
|
||
server_name = server_data['attributes']['name']
|
||
server_description = server_data['attributes'].get('description', 'No description')
|
||
|
||
# Truncate description if too long for dropdown constraints
|
||
if len(server_description) > 50:
|
||
server_description = server_description[:47] + "..."
|
||
|
||
server_options.append(
|
||
discord.SelectOption(
|
||
label=server_name,
|
||
value=server_id,
|
||
description=server_description
|
||
)
|
||
)
|
||
|
||
# Create dropdown view with timeout for automatic cleanup
|
||
class ServerDropdownView(discord.ui.View):
|
||
def __init__(self, server_options, timeout=180): # 3 minute timeout for ephemeral cleanup
|
||
super().__init__(timeout=timeout)
|
||
self.server_options = server_options
|
||
self.add_item(ServerDropdown(server_options))
|
||
|
||
async def on_timeout(self):
|
||
# Clean up when dropdown times out (ephemeral auto-removal)
|
||
logger.debug("Server dropdown timed out and was automatically cleaned up")
|
||
|
||
# Dropdown selection handler for server choice
|
||
class ServerDropdown(discord.ui.Select):
|
||
def __init__(self, server_options):
|
||
super().__init__(
|
||
placeholder="Select a server to display...",
|
||
options=server_options,
|
||
min_values=1,
|
||
max_values=1
|
||
)
|
||
|
||
async def callback(self, interaction: discord.Interaction):
|
||
"""
|
||
Handle server selection from dropdown menu.
|
||
Creates permanent status embed in the channel for the selected server.
|
||
"""
|
||
await interaction.response.defer(ephemeral=True)
|
||
|
||
selected_server_id = self.values[0]
|
||
server_data = bot.server_cache.get(selected_server_id)
|
||
|
||
if not server_data:
|
||
await interaction.followup.send(
|
||
"❌ Selected server no longer available. Please try again.",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
server_name = server_data['attributes']['name']
|
||
logger.info(f"User {interaction.user.name} selected server: {server_name}")
|
||
|
||
try:
|
||
# Get current server status for embed creation
|
||
resources = await bot.pterodactyl_api.get_server_resources(selected_server_id)
|
||
|
||
# Delete old embed if it exists to prevent duplication
|
||
if selected_server_id in bot.embed_locations:
|
||
logger.debug(f"Found existing embed for {selected_server_id}, attempting to delete")
|
||
try:
|
||
old_location = bot.embed_locations[selected_server_id]
|
||
old_channel = bot.get_channel(int(old_location['channel_id']))
|
||
if old_channel:
|
||
try:
|
||
old_message = await old_channel.fetch_message(int(old_location['message_id']))
|
||
await old_message.delete()
|
||
logger.debug(f"Deleted old embed for {selected_server_id}")
|
||
except discord.NotFound:
|
||
logger.debug(f"Old embed for {selected_server_id} already deleted")
|
||
except Exception as e:
|
||
logger.error(f"Failed to delete old embed: {str(e)}")
|
||
|
||
# Create and send new permanent status embed in channel
|
||
embed, view = await bot.get_server_status_embed(server_data, resources)
|
||
message = await interaction.channel.send(embed=embed, view=view)
|
||
await bot.track_new_embed(selected_server_id, message)
|
||
|
||
await interaction.followup.send(
|
||
f"✅ **{server_name}** status has been posted in {interaction.channel.mention}",
|
||
ephemeral=True
|
||
)
|
||
logger.info(f"Successfully posted status for {server_name}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to create status embed for {server_name}: {str(e)}")
|
||
await interaction.followup.send(
|
||
f"❌ Failed to create status embed for **{server_name}**: {str(e)}",
|
||
ephemeral=True
|
||
)
|
||
|
||
# Send the initial dashboard embed with dropdown (ephemeral - auto-cleaned)
|
||
await interaction.followup.send(
|
||
embed=stats_embed,
|
||
view=ServerDropdownView(server_options),
|
||
ephemeral=True
|
||
)
|
||
logger.info(f"Sent server status dashboard to {interaction.user.name} with {len(server_options)} servers")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Server status command failed: {str(e)}")
|
||
await interaction.followup.send(
|
||
f"❌ Failed to load server status: {str(e)}",
|
||
ephemeral=True
|
||
)
|
||
|
||
@bot.tree.command(name="refresh_embeds", description="Refresh all server status embeds (admin only)")
|
||
async def refresh_embeds(interaction: discord.Interaction):
|
||
"""Slash command to refresh all server embeds."""
|
||
if not await check_allowed_guild(interaction):
|
||
return
|
||
|
||
logger.info(f"Refresh embeds command invoked by {interaction.user.name}")
|
||
await interaction.response.defer(ephemeral=True)
|
||
|
||
# Require administrator permissions
|
||
if not interaction.user.guild_permissions.administrator:
|
||
logger.warning(f"Unauthorized refresh attempt by {interaction.user.name}")
|
||
await interaction.followup.send(
|
||
"You need administrator permissions to refresh all embeds.",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
try:
|
||
logger.info("Starting full embed refresh per admin request")
|
||
deleted, created = await bot.refresh_all_embeds()
|
||
await interaction.followup.send(
|
||
f"Refreshed all embeds. Deleted {deleted} old embeds, created {created} new ones.",
|
||
ephemeral=True
|
||
)
|
||
logger.info(f"Embed refresh completed: {deleted} deleted, {created} created")
|
||
except Exception as e:
|
||
logger.error(f"Embed refresh failed: {str(e)}")
|
||
await interaction.followup.send(
|
||
f"Failed to refresh embeds: {str(e)}",
|
||
ephemeral=True
|
||
)
|
||
|
||
@bot.tree.command(name="purge_embeds", description="Permanently delete all server status embeds (admin only)")
|
||
async def purge_embeds(interaction: discord.Interaction):
|
||
"""
|
||
Slash command to permanently purge all server status embeds from Discord channels.
|
||
|
||
This command performs a complete cleanup of all tracked server status embeds by:
|
||
1. Iterating through all tracked embed locations in embed_locations.json
|
||
2. Attempting to delete each embed message from its respective Discord channel
|
||
3. Clearing the embed tracking file and internal state tracking
|
||
4. Providing real-time progress updates during the operation
|
||
|
||
Args:
|
||
interaction: Discord interaction object representing the command invocation
|
||
|
||
Workflow:
|
||
- Validates administrator permissions
|
||
- Checks if any embeds are currently tracked
|
||
- Sends initial progress embed
|
||
- Processes each embed sequentially with error handling
|
||
- Updates progress embed in real-time
|
||
- Saves cleared tracking data to disk
|
||
- Sends final results with comprehensive statistics
|
||
|
||
Safety:
|
||
- Only affects tracked embeds (won't delete arbitrary messages)
|
||
- Maintains logs for audit purposes
|
||
- Provides rollback protection through immediate tracking removal
|
||
- Includes rate limiting to avoid Discord API limits
|
||
|
||
Returns:
|
||
None: Sends follow-up messages with operation results
|
||
"""
|
||
if not await check_allowed_guild(interaction):
|
||
return
|
||
|
||
logger.info(f"Purge embeds command invoked by {interaction.user.name}")
|
||
await interaction.response.defer(ephemeral=True)
|
||
|
||
# Require administrator permissions
|
||
if not interaction.user.guild_permissions.administrator:
|
||
logger.warning(f"Unauthorized purge attempt by {interaction.user.name}")
|
||
await interaction.followup.send(
|
||
"You need administrator permissions to purge all embeds.",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
try:
|
||
logger.info("Starting embed purge per admin request")
|
||
|
||
# Variables to track purge statistics
|
||
deleted_count = 0
|
||
not_found_count = 0
|
||
error_count = 0
|
||
total_embeds = len(bot.embed_locations)
|
||
|
||
if total_embeds == 0:
|
||
await interaction.followup.send(
|
||
"No embeds are currently being tracked. Nothing to purge.",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
# Create progress embed
|
||
progress_embed = discord.Embed(
|
||
title="🔄 Purging Server Embeds",
|
||
description=f"Processing {total_embeds} embeds...",
|
||
color=discord.Color.orange()
|
||
)
|
||
progress_embed.add_field(name="Deleted", value="0", inline=True)
|
||
progress_embed.add_field(name="Not Found", value="0", inline=True)
|
||
progress_embed.add_field(name="Errors", value="0", inline=True)
|
||
progress_embed.set_footer(text="This may take a while...")
|
||
|
||
progress_message = await interaction.followup.send(embed=progress_embed, ephemeral=True)
|
||
|
||
# Process each tracked embed
|
||
for server_id, location in list(bot.embed_locations.items()):
|
||
try:
|
||
channel = bot.get_channel(int(location['channel_id']))
|
||
if channel:
|
||
try:
|
||
message = await channel.fetch_message(int(location['message_id']))
|
||
await message.delete()
|
||
deleted_count += 1
|
||
logger.debug(f"Successfully purged embed for server {server_id}")
|
||
except discord.NotFound:
|
||
not_found_count += 1
|
||
logger.debug(f"Embed for server {server_id} already deleted")
|
||
except discord.Forbidden:
|
||
error_count += 1
|
||
logger.error(f"Permission denied when deleting embed for server {server_id}")
|
||
except Exception as e:
|
||
error_count += 1
|
||
logger.error(f"Error deleting embed for server {server_id}: {str(e)}")
|
||
else:
|
||
not_found_count += 1
|
||
logger.warning(f"Channel not found for server {server_id}")
|
||
|
||
# Remove from tracking immediately
|
||
bot.embed_locations.pop(server_id, None)
|
||
bot.previous_states.pop(server_id, None) # Also clean up state tracking
|
||
|
||
# Update progress every 5 embeds or for the last one
|
||
if (deleted_count + not_found_count + error_count) % 5 == 0 or \
|
||
(deleted_count + not_found_count + error_count) == total_embeds:
|
||
|
||
progress_embed.description = f"Processed {deleted_count + not_found_count + error_count}/{total_embeds} embeds"
|
||
progress_embed.set_field_at(0, name="Deleted", value=str(deleted_count), inline=True)
|
||
progress_embed.set_field_at(1, name="Not Found", value=str(not_found_count), inline=True)
|
||
progress_embed.set_field_at(2, name="Errors", value=str(error_count), inline=True)
|
||
|
||
await progress_message.edit(embed=progress_embed)
|
||
|
||
# Small delay to avoid rate limits
|
||
await asyncio.sleep(0.3)
|
||
|
||
except Exception as e:
|
||
error_count += 1
|
||
logger.error(f"Unexpected error processing server {server_id}: {str(e)}")
|
||
|
||
# Save the cleared embed locations
|
||
await bot.save_embed_locations()
|
||
|
||
# Create results embed
|
||
result_embed = discord.Embed(
|
||
title="✅ Embed Purge Complete",
|
||
color=discord.Color.green(),
|
||
timestamp=datetime.now()
|
||
)
|
||
result_embed.add_field(name="Total Tracked", value=str(total_embeds), inline=True)
|
||
result_embed.add_field(name="✅ Successfully Deleted", value=str(deleted_count), inline=True)
|
||
result_embed.add_field(name="❌ Already Missing", value=str(not_found_count), inline=True)
|
||
result_embed.add_field(name="⚠️ Errors", value=str(error_count), inline=True)
|
||
result_embed.add_field(name="📊 Success Rate",
|
||
value=f"{((deleted_count + not_found_count) / total_embeds * 100):.1f}%",
|
||
inline=True)
|
||
result_embed.set_footer(text="Embed tracking file has been cleared")
|
||
|
||
await progress_message.edit(embed=result_embed)
|
||
logger.info(f"Embed purge completed: {deleted_count} deleted, {not_found_count} not found, {error_count} errors")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Embed purge failed: {str(e)}")
|
||
await interaction.followup.send(
|
||
f"❌ Failed to purge embeds: {str(e)}",
|
||
ephemeral=True
|
||
)
|
||
|
||
# ==============================================
|
||
# BOT EVENTS
|
||
# ==============================================
|
||
|
||
@bot.event
|
||
async def on_interaction(interaction: discord.Interaction):
|
||
"""Global interaction handler to check guild before processing any interaction."""
|
||
if interaction.guild_id != ALLOWED_GUILD_ID:
|
||
logger.debug(f"Ignoring interaction from unauthorized guild {interaction.guild_id}")
|
||
await interaction.response.send_message(
|
||
"This bot is only available in a specific server.",
|
||
ephemeral=True
|
||
)
|
||
return
|
||
|
||
@bot.event
|
||
async def on_ready():
|
||
"""Called when the bot successfully connects to Discord."""
|
||
logger.info(f"Bot connected as {bot.user.name} (ID: {bot.user.id})")
|
||
|
||
try:
|
||
# Sync commands only to the allowed guild
|
||
guild = discord.Object(id=ALLOWED_GUILD_ID)
|
||
bot.tree.copy_global_to(guild=guild)
|
||
synced = await bot.tree.sync(guild=guild)
|
||
logger.info(f"Successfully synced {len(synced)} command(s) to guild {ALLOWED_GUILD_ID}: {[cmd.name for cmd in synced]}")
|
||
except Exception as e:
|
||
logger.error(f"Command sync failed: {str(e)}")
|
||
|
||
# ==============================================
|
||
# SYSTEM SIGNAL HANDLERS
|
||
# ==============================================
|
||
|
||
def handle_sigint(signum: int, frame: types.FrameType) -> None:
|
||
"""
|
||
Handle SIGINT signals (Ctrl+C) by initiating graceful shutdown.
|
||
|
||
Args:
|
||
signum: The signal number (signal.SIGINT)
|
||
frame: Current stack frame (unused but required by signal handler signature)
|
||
"""
|
||
logger.info("Received SIGINT (Ctrl+C), initiating graceful shutdown...")
|
||
raise KeyboardInterrupt
|
||
|
||
def handle_sigterm(signum: int, frame: types.FrameType) -> None:
|
||
"""
|
||
Handle SIGTERM signals (container stop) by initiating graceful shutdown.
|
||
|
||
Args:
|
||
signum: The signal number (signal.SIGTERM)
|
||
frame: Current stack frame (unused but required by signal handler signature)
|
||
"""
|
||
logger.info("Received SIGTERM (container stop), initiating graceful shutdown...")
|
||
raise KeyboardInterrupt
|
||
|
||
# ==============================================
|
||
# BOT STARTUP
|
||
# ==============================================
|
||
|
||
if __name__ == "__main__":
|
||
"""
|
||
Main entry point for the bot application.
|
||
|
||
Handles:
|
||
- Signal registration for graceful shutdowns (SIGINT/SIGTERM)
|
||
- Primary bot execution loop
|
||
- Error handling and crash reporting
|
||
- Resource cleanup on shutdown
|
||
|
||
Flow:
|
||
1. Initialize signal handlers
|
||
2. Start bot with Discord token
|
||
3. Handle interrupts and exceptions
|
||
4. Execute final cleanup
|
||
"""
|
||
logger.info("Starting bot initialization")
|
||
|
||
# Register signal handlers
|
||
signal.signal(signal.SIGINT, handle_sigint) # For Ctrl+C
|
||
signal.signal(signal.SIGTERM, handle_sigterm) # For container stop commands
|
||
logger.info("System signal handlers registered")
|
||
|
||
try:
|
||
bot.run(DISCORD_TOKEN)
|
||
except KeyboardInterrupt:
|
||
logger.info("Bot shutting down")
|
||
except Exception as e:
|
||
logger.error(f"Bot crashed: {str(e)}")
|
||
sys.exit(1) # Exit with error code for crash
|
||
finally:
|
||
logger.info("Bot shutdown complete")
|
||
sys.exit(0) # Explicit clean exit
|