All checks were successful
Docker Build and Push / build-and-push (push) Successful in 1m9s
1157 lines
50 KiB
Python
1157 lines
50 KiB
Python
"""
|
|
Pterodactyl Discord Bot
|
|
A comprehensive Discord bot for managing Pterodactyl game servers through Discord.
|
|
|
|
Features:
|
|
- Server status monitoring with auto-updating embeds
|
|
- Power controls (start/stop/restart)
|
|
- Server address/port display
|
|
- Multi-channel embed support
|
|
- Automatic embed refresh system
|
|
- Role-based access control
|
|
- Single-guild restriction
|
|
- Extensive logging for all operations
|
|
"""
|
|
|
|
import discord
|
|
from discord.ext import commands, tasks
|
|
from discord import app_commands
|
|
import os
|
|
import sys
|
|
import signal
|
|
import types
|
|
import aiohttp
|
|
import asyncio
|
|
import json
|
|
import traceback
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
import configparser
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Tuple
|
|
from pathlib import Path
|
|
import generate_config
|
|
|
|
# ==============================================
|
|
# LOGGING SETUP
|
|
# ==============================================
|
|
|
|
logs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
|
|
os.makedirs(logs_dir, exist_ok=True)
|
|
|
|
logger = logging.getLogger('pterodisbot')
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# File handler for logs (rotates when reaching 5MB, keeps 3 backups)
|
|
handler = RotatingFileHandler(
|
|
filename=os.path.join(logs_dir, 'pterodisbot.log'),
|
|
maxBytes=5*1024*1024, # 5 MiB max log file size
|
|
backupCount=3, # Rotate through 3 files
|
|
encoding='utf-8'
|
|
)
|
|
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
|
logger.addHandler(handler)
|
|
|
|
# Console handler for real-time output
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.INFO)
|
|
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
|
logger.addHandler(console_handler)
|
|
|
|
logger.info("Initialized logging system with file and console output")
|
|
|
|
# ==============================================
|
|
# CONFIGURATION SETUP
|
|
# ==============================================
|
|
|
|
# generate_config.generate_config()
|
|
# logger.debug("Gennerated config.ini file using values from .env")
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read('config.ini')
|
|
|
|
# ==============================================
|
|
# CONFIGURATION VALIDATION
|
|
# ==============================================
|
|
|
|
class ConfigValidationError(Exception):
|
|
"""Custom exception for configuration validation errors."""
|
|
pass
|
|
|
|
def validate_config():
|
|
"""
|
|
Validate all required configuration values at startup.
|
|
Raises ConfigValidationError if any required values are missing or invalid.
|
|
"""
|
|
errors = []
|
|
|
|
# Validate Pterodactyl section
|
|
if not config.has_section('Pterodactyl'):
|
|
errors.append("Missing [Pterodactyl] section in config.ini")
|
|
else:
|
|
required_ptero = ['PanelURL', 'ClientAPIKey', 'ApplicationAPIKey']
|
|
for key in required_ptero:
|
|
if not config.get('Pterodactyl', key, fallback=None):
|
|
errors.append(f"Missing required Pterodactyl config value: {key}")
|
|
|
|
# Validate Discord section
|
|
if not config.has_section('Discord'):
|
|
errors.append("Missing [Discord] section in config.ini")
|
|
else:
|
|
required_discord = ['Token', 'AllowedGuildID']
|
|
for key in required_discord:
|
|
if not config.get('Discord', key, fallback=None):
|
|
errors.append(f"Missing required Discord config value: {key}")
|
|
|
|
# Validate AllowedGuildID is a valid integer
|
|
try:
|
|
guild_id = config.getint('Discord', 'AllowedGuildID', fallback=0)
|
|
if guild_id <= 0:
|
|
errors.append("AllowedGuildID must be a positive integer")
|
|
except ValueError:
|
|
errors.append("AllowedGuildID must be a valid integer")
|
|
|
|
# Validate API keys have correct prefixes
|
|
client_key = config.get('Pterodactyl', 'ClientAPIKey', fallback='')
|
|
if client_key and not client_key.startswith('ptlc_'):
|
|
errors.append("ClientAPIKey should start with 'ptlc_'")
|
|
|
|
app_key = config.get('Pterodactyl', 'ApplicationAPIKey', fallback='')
|
|
if app_key and not app_key.startswith('ptla_'):
|
|
errors.append("ApplicationAPIKey should start with 'ptla_'")
|
|
|
|
# Validate PanelURL is a valid URL
|
|
panel_url = config.get('Pterodactyl', 'PanelURL', fallback='')
|
|
if panel_url and not (panel_url.startswith('http://') or panel_url.startswith('https://')):
|
|
errors.append("PanelURL must start with http:// or https://")
|
|
|
|
if errors:
|
|
error_msg = "Configuration validation failed:\n- " + "\n- ".join(errors)
|
|
logger.error(error_msg)
|
|
raise ConfigValidationError(error_msg)
|
|
|
|
logger.info("Configuration validation passed")
|
|
|
|
# ==============================================
|
|
# CONSTANTS (Updated with validation)
|
|
# ==============================================
|
|
|
|
try:
|
|
validate_config()
|
|
|
|
PTERODACTYL_URL = config.get('Pterodactyl', 'PanelURL')
|
|
PTERODACTYL_CLIENT_API_KEY = config.get('Pterodactyl', 'ClientAPIKey')
|
|
PTERODACTYL_APPLICATION_API_KEY = config.get('Pterodactyl', 'ApplicationAPIKey')
|
|
DISCORD_TOKEN = config.get('Discord', 'Token')
|
|
ALLOWED_GUILD_ID = config.getint('Discord', 'AllowedGuildID')
|
|
REQUIRED_ROLE = "Game Server User"
|
|
UPDATE_INTERVAL = 10
|
|
EMBED_LOCATIONS_FILE = "./embed/embed_locations.json"
|
|
|
|
logger.debug("Loaded and validated configuration values from config.ini")
|
|
|
|
except ConfigValidationError as e:
|
|
logger.critical(f"Configuration error: {str(e)}")
|
|
raise
|
|
except Exception as e:
|
|
logger.critical(f"Unexpected error loading configuration: {str(e)}")
|
|
raise
|
|
|
|
# ==============================================
|
|
# PTERODACTYL API CLASS
|
|
# ==============================================
|
|
|
|
class PterodactylAPI:
|
|
"""
|
|
Handles all interactions with the Pterodactyl Panel API.
|
|
Uses client API key for client endpoints and application API key for admin endpoints.
|
|
Provides methods for server management and monitoring.
|
|
"""
|
|
|
|
def __init__(self, panel_url: str, client_api_key: str, application_api_key: str):
|
|
"""
|
|
Initialize the Pterodactyl API client with both API keys.
|
|
|
|
Args:
|
|
panel_url: URL of the Pterodactyl panel (must include protocol)
|
|
client_api_key: API key for client endpoints (starts with ptlc_)
|
|
application_api_key: API key for application endpoints (starts with ptla_)
|
|
"""
|
|
self.panel_url = panel_url.rstrip('/')
|
|
self.client_api_key = client_api_key
|
|
self.application_api_key = application_api_key
|
|
self.session = None
|
|
self.lock = asyncio.Lock() # Prevents concurrent API access
|
|
logger.info("Initialized PterodactylAPI client with provided credentials")
|
|
|
|
async def initialize(self):
|
|
"""Initialize the aiohttp client session for API requests."""
|
|
self.session = aiohttp.ClientSession()
|
|
logger.debug("Created new aiohttp ClientSession")
|
|
|
|
async def close(self):
|
|
"""Cleanly close the aiohttp session when shutting down."""
|
|
if self.session and not self.session.closed:
|
|
await self.session.close()
|
|
logger.debug("Closed aiohttp ClientSession")
|
|
|
|
async def _request(self, method: str, endpoint: str, data: Optional[dict] = None, use_application_key: bool = False) -> dict:
|
|
"""
|
|
Make an authenticated request to the Pterodactyl API.
|
|
|
|
Args:
|
|
method: HTTP method (GET, POST, PUT, DELETE, etc.)
|
|
endpoint: API endpoint (e.g., 'application/servers')
|
|
data: Optional JSON payload for POST/PUT requests
|
|
use_application_key: Whether to use the application API key (admin endpoints)
|
|
|
|
Returns:
|
|
Dictionary containing API response or error information
|
|
|
|
Raises:
|
|
aiohttp.ClientError: For network-related issues
|
|
json.JSONDecodeError: If response cannot be parsed as JSON
|
|
"""
|
|
url = f"{self.panel_url}/api/{endpoint}"
|
|
api_key_type = "Application" if use_application_key else "Client"
|
|
logger.debug(f"Preparing {method} request to {endpoint} using {api_key_type} API key")
|
|
|
|
# Choose the appropriate API key
|
|
api_key = self.application_api_key if use_application_key else self.client_api_key
|
|
headers = {
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
try:
|
|
async with self.lock:
|
|
logger.debug(f"Acquired lock for API request to {endpoint}")
|
|
async with self.session.request(
|
|
method,
|
|
url,
|
|
headers=headers,
|
|
json=data
|
|
) as response:
|
|
if response.status == 204: # No content
|
|
logger.debug(f"Received 204 No Content response from {endpoint}")
|
|
return {"status": "success"}
|
|
|
|
response_data = await response.json()
|
|
logger.debug(f"Received response from {endpoint} with status {response.status}")
|
|
|
|
if response.status >= 400:
|
|
error_msg = response_data.get('errors', [{}])[0].get('detail', 'Unknown error')
|
|
logger.error(f"API request to {endpoint} failed with status {response.status}: {error_msg}")
|
|
return {"status": "error", "message": error_msg}
|
|
|
|
return response_data
|
|
except Exception as e:
|
|
logger.error(f"Exception during API request to {endpoint}: {str(e)}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
async def get_servers(self) -> List[dict]:
|
|
"""
|
|
Get a list of all servers from the Pterodactyl panel.
|
|
Uses application API key as this is an admin endpoint.
|
|
|
|
Returns:
|
|
List of server dictionaries containing all server attributes
|
|
"""
|
|
logger.info("Fetching list of all servers from Pterodactyl panel")
|
|
response = await self._request("GET", "application/servers", use_application_key=True)
|
|
servers = response.get('data', [])
|
|
logger.info(f"Retrieved {len(servers)} servers from Pterodactyl panel")
|
|
return servers
|
|
|
|
async def get_server_resources(self, server_id: str) -> dict:
|
|
"""
|
|
Get resource usage for a specific server.
|
|
Uses client API key as this is a client endpoint.
|
|
|
|
Args:
|
|
server_id: The Pterodactyl server identifier
|
|
|
|
Returns:
|
|
Dictionary containing server resource usage and current state
|
|
"""
|
|
logger.debug(f"Fetching resource usage for server {server_id}")
|
|
try:
|
|
response = await self._request("GET", f"client/servers/{server_id}/resources")
|
|
if response.get('status') == 'error':
|
|
error_msg = response.get('message', 'Unknown error')
|
|
logger.error(f"Failed to get resources for server {server_id}: {error_msg}")
|
|
return {'attributes': {'current_state': 'offline'}}
|
|
|
|
state = response.get('attributes', {}).get('current_state', 'unknown')
|
|
logger.debug(f"Server {server_id} current state: {state}")
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"Exception getting resources for server {server_id}: {str(e)}")
|
|
return {'attributes': {'current_state': 'offline'}}
|
|
|
|
async def send_power_action(self, server_id: str, action: str) -> dict:
|
|
"""
|
|
Send a power action to a server (start/stop/restart).
|
|
Uses client API key as this is a client endpoint.
|
|
|
|
Args:
|
|
server_id: The Pterodactyl server identifier
|
|
action: Power action to send (start/stop/restart)
|
|
|
|
Returns:
|
|
Dictionary containing API response status
|
|
"""
|
|
valid_actions = ['start', 'stop', 'restart']
|
|
if action not in valid_actions:
|
|
logger.warning(f"Invalid power action attempted: {action}")
|
|
return {"status": "error", "message": f"Invalid action. Must be one of: {', '.join(valid_actions)}"}
|
|
|
|
logger.info(f"Sending {action} command to server {server_id}")
|
|
result = await self._request("POST", f"client/servers/{server_id}/power", {"signal": action})
|
|
|
|
if result.get('status') == 'success':
|
|
logger.info(f"Successfully executed {action} on server {server_id}")
|
|
else:
|
|
logger.error(f"Failed to execute {action} on server {server_id}: {result.get('message', 'Unknown error')}")
|
|
|
|
return result
|
|
|
|
async def get_server_details(self, server_id: str) -> dict:
|
|
"""
|
|
Get detailed server information including allocations.
|
|
Uses application API key as this is an admin endpoint.
|
|
|
|
Args:
|
|
server_id: The Pterodactyl server identifier
|
|
|
|
Returns:
|
|
Dictionary containing detailed server information
|
|
"""
|
|
logger.debug(f"Fetching detailed information for server {server_id}")
|
|
return await self._request("GET", f"application/servers/{server_id}", use_application_key=True)
|
|
|
|
async def get_server_allocations(self, server_id: str) -> dict:
|
|
"""
|
|
Get allocation information for a server (IP addresses and ports).
|
|
Uses application API key as this is an admin endpoint.
|
|
|
|
Args:
|
|
server_id: The Pterodactyl server identifier
|
|
|
|
Returns:
|
|
Dictionary containing server allocation information
|
|
"""
|
|
logger.debug(f"Fetching allocation information for server {server_id}")
|
|
return await self._request("GET", f"application/servers/{server_id}/allocations", use_application_key=True)
|
|
|
|
# ==============================================
|
|
# SERVER STATUS VIEW CLASS (Buttons and UI)
|
|
# ==============================================
|
|
|
|
class ServerStatusView(discord.ui.View):
|
|
"""
|
|
Interactive Discord view containing server control buttons.
|
|
Provides persistent controls for server management with role-based access.
|
|
"""
|
|
|
|
def __init__(self, server_id: str, server_name: str, pterodactyl_api: PterodactylAPI, server_data: dict):
|
|
"""
|
|
Initialize the server status view with control buttons.
|
|
|
|
Args:
|
|
server_id: The server's Pterodactyl identifier
|
|
server_name: Human-readable server name
|
|
pterodactyl_api: API client instance
|
|
server_data: Full server data from Pterodactyl
|
|
"""
|
|
super().__init__(timeout=None) # Persistent view
|
|
self.server_id = server_id
|
|
self.server_name = server_name
|
|
self.api = pterodactyl_api
|
|
self.server_data = server_data
|
|
logger.debug(f"Created ServerStatusView for {server_name} ({server_id})")
|
|
|
|
async def interaction_check(self, interaction: discord.Interaction) -> bool:
|
|
"""
|
|
Verify the interacting user has the required role and is in the allowed guild.
|
|
|
|
Args:
|
|
interaction: Discord interaction object
|
|
|
|
Returns:
|
|
bool: True if authorized, False otherwise
|
|
"""
|
|
# First check if interaction is from the allowed guild
|
|
if interaction.guild_id != ALLOWED_GUILD_ID:
|
|
logger.warning(f"Unauthorized interaction attempt from guild {interaction.guild_id}")
|
|
await interaction.response.send_message(
|
|
"This bot is only available in a specific server.",
|
|
ephemeral=True
|
|
)
|
|
return False
|
|
|
|
# Then check for required role
|
|
logger.debug(f"Checking permissions for {interaction.user.name} on server {self.server_name}")
|
|
has_role = any(role.name == REQUIRED_ROLE for role in interaction.user.roles)
|
|
if not has_role:
|
|
logger.warning(f"Permission denied for {interaction.user.name} - missing '{REQUIRED_ROLE}' role")
|
|
await interaction.response.send_message(
|
|
f"You don't have permission to control servers. You need the '{REQUIRED_ROLE}' role.",
|
|
ephemeral=True
|
|
)
|
|
return False
|
|
|
|
logger.debug(f"Permission granted for {interaction.user.name}")
|
|
return True
|
|
|
|
async def on_error(self, interaction: discord.Interaction, error: Exception, item: discord.ui.Item):
|
|
"""
|
|
Handle errors in button interactions.
|
|
|
|
Args:
|
|
interaction: Discord interaction object
|
|
error: Exception that occurred
|
|
item: The UI item that triggered the error
|
|
"""
|
|
logger.error(f"View error in {self.server_name} by {interaction.user.name}: {str(error)}")
|
|
await interaction.response.send_message(
|
|
"An error occurred while processing your request.",
|
|
ephemeral=True
|
|
)
|
|
|
|
@discord.ui.button(label="Start", style=discord.ButtonStyle.green, custom_id="start_button")
|
|
async def start_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
"""Send a start command to the server."""
|
|
logger.info(f"Start button pressed for {self.server_name} by {interaction.user.name}")
|
|
await interaction.response.defer(ephemeral=True)
|
|
result = await self.api.send_power_action(self.server_id, "start")
|
|
|
|
if result.get('status') == 'success':
|
|
message = f"Server '{self.server_name}' is starting..."
|
|
logger.info(f"Successfully started server {self.server_name}")
|
|
else:
|
|
message = f"Failed to start server: {result.get('message', 'Unknown error')}"
|
|
logger.error(f"Failed to start server {self.server_name}: {message}")
|
|
|
|
await interaction.followup.send(message, ephemeral=True)
|
|
|
|
@discord.ui.button(label="Stop", style=discord.ButtonStyle.red, custom_id="stop_button")
|
|
async def stop_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
"""Send a stop command to the server."""
|
|
logger.info(f"Stop button pressed for {self.server_name} by {interaction.user.name}")
|
|
await interaction.response.defer(ephemeral=True)
|
|
result = await self.api.send_power_action(self.server_id, "stop")
|
|
|
|
if result.get('status') == 'success':
|
|
message = f"Server '{self.server_name}' is stopping..."
|
|
logger.info(f"Successfully stopped server {self.server_name}")
|
|
else:
|
|
message = f"Failed to stop server: {result.get('message', 'Unknown error')}"
|
|
logger.error(f"Failed to stop server {self.server_name}: {message}")
|
|
|
|
await interaction.followup.send(message, ephemeral=True)
|
|
|
|
@discord.ui.button(label="Restart", style=discord.ButtonStyle.blurple, custom_id="restart_button")
|
|
async def restart_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
"""Send a restart command to the server."""
|
|
logger.info(f"Restart button pressed for {self.server_name} by {interaction.user.name}")
|
|
await interaction.response.defer(ephemeral=True)
|
|
result = await self.api.send_power_action(self.server_id, "restart")
|
|
|
|
if result.get('status') == 'success':
|
|
message = f"Server '{self.server_name}' is restarting..."
|
|
logger.info(f"Successfully restarted server {self.server_name}")
|
|
else:
|
|
message = f"Failed to restart server: {result.get('message', 'Unknown error')}"
|
|
logger.error(f"Failed to restart server {self.server_name}: {message}")
|
|
|
|
await interaction.followup.send(message, ephemeral=True)
|
|
|
|
@discord.ui.button(label="Show Address", style=discord.ButtonStyle.grey, custom_id="show_address_button")
|
|
async def show_address_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
"""Show server's default allocation IP and port using client API."""
|
|
logger.info(f"Show Address button pressed for {self.server_name} by {interaction.user.name}")
|
|
try:
|
|
await interaction.response.defer(ephemeral=True)
|
|
logger.debug(f"Fetching server details for {self.server_id}")
|
|
|
|
# Get server details using client API
|
|
server_details = await self.api._request(
|
|
"GET",
|
|
f"client/servers/{self.server_id}",
|
|
use_application_key=False
|
|
)
|
|
|
|
if server_details.get('status') == 'error':
|
|
error_msg = server_details.get('message', 'Unknown error')
|
|
logger.error(f"Failed to get server details for {self.server_id}: {error_msg}")
|
|
raise ValueError(error_msg)
|
|
|
|
attributes = server_details.get('attributes', {})
|
|
relationships = attributes.get('relationships', {})
|
|
allocations = relationships.get('allocations', {}).get('data', [])
|
|
|
|
if not allocations:
|
|
logger.warning(f"No allocations found for server {self.server_id}")
|
|
raise ValueError("No allocations found for this server")
|
|
|
|
# Find the default allocation (is_default=True)
|
|
default_allocation = next(
|
|
(alloc for alloc in allocations
|
|
if alloc.get('attributes', {}).get('is_default', False)),
|
|
allocations[0] # Fallback to first allocation if no default found
|
|
)
|
|
|
|
allocation_attrs = default_allocation.get('attributes', {})
|
|
ip_alias = allocation_attrs.get('ip_alias', 'Unknown')
|
|
port = str(allocation_attrs.get('port', 'Unknown'))
|
|
|
|
logger.debug(f"Retrieved connection info for {self.server_id}: {ip_alias}:{port}")
|
|
|
|
# Create and send embed
|
|
embed = discord.Embed(
|
|
title=f"{self.server_name} Connection Info",
|
|
color=discord.Color.blue(),
|
|
description=f"Server ID: `{self.server_id}`"
|
|
)
|
|
embed.add_field(name="Address", value=f"`{ip_alias}`", inline=True)
|
|
embed.add_field(name="Port", value=f"`{port}`", inline=True)
|
|
|
|
await interaction.followup.send(embed=embed, ephemeral=True)
|
|
logger.info(f"Displayed connection info for {self.server_name}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to show address for {self.server_name}: {str(e)}")
|
|
await interaction.followup.send(
|
|
"⚠️ Failed to get connection info. The server may not have any ports allocated.",
|
|
ephemeral=True
|
|
)
|
|
|
|
# ==============================================
|
|
# MAIN BOT CLASS
|
|
# ==============================================
|
|
|
|
class PterodactylBot(commands.Bot):
|
|
"""
|
|
Main bot class for Pterodactyl server management.
|
|
Handles Discord interactions, embed management, and background tasks.
|
|
Manages server status embeds and user commands.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
"""
|
|
Initialize the Pterodactyl bot instance.
|
|
Sets up caches, locks, and storage paths.
|
|
"""
|
|
super().__init__(*args, **kwargs)
|
|
self.pterodactyl_api = None # Pterodactyl API client
|
|
self.server_cache: Dict[str, dict] = {} # Cache of server data from Pterodactyl
|
|
self.embed_locations: Dict[str, Dict[str, int]] = {} # Tracks where embeds are posted
|
|
self.update_lock = asyncio.Lock() # Prevents concurrent updates
|
|
self.embed_storage_path = Path(EMBED_LOCATIONS_FILE) # File to store embed locations
|
|
# Track previous server states and CPU usage to detect changes
|
|
# Format: {server_id: (state, cpu_usage)}
|
|
self.previous_states: Dict[str, Tuple[str, float]] = {}
|
|
logger.info("Initialized PterodactylBot instance with state tracking")
|
|
|
|
async def setup_hook(self):
|
|
"""
|
|
Bot setup routine called when the bot is starting.
|
|
Initializes API client, loads saved data, and starts background tasks.
|
|
"""
|
|
logger.info("Running bot setup hook")
|
|
|
|
# Initialize API client
|
|
self.pterodactyl_api = PterodactylAPI(
|
|
PTERODACTYL_URL,
|
|
PTERODACTYL_CLIENT_API_KEY,
|
|
PTERODACTYL_APPLICATION_API_KEY
|
|
)
|
|
await self.pterodactyl_api.initialize()
|
|
logger.info("Initialized Pterodactyl API client")
|
|
|
|
# Load saved embed locations
|
|
await self.load_embed_locations()
|
|
|
|
# Start background update task
|
|
self.update_status.start()
|
|
logger.info("Started background status update task")
|
|
|
|
async def load_embed_locations(self):
|
|
"""Load saved embed locations from JSON storage file."""
|
|
logger.debug("Attempting to load embed locations from storage")
|
|
if not self.embed_storage_path.exists():
|
|
logger.info("No existing embed locations file found")
|
|
return
|
|
|
|
try:
|
|
with open(self.embed_storage_path, 'r') as f:
|
|
self.embed_locations = json.load(f)
|
|
logger.info(f"Loaded {len(self.embed_locations)} embed locations from storage")
|
|
except Exception as e:
|
|
logger.error(f"Failed to load embed locations: {str(e)}")
|
|
|
|
async def save_embed_locations(self):
|
|
"""Save current embed locations to JSON storage file."""
|
|
logger.debug("Attempting to save embed locations to storage")
|
|
try:
|
|
with open(self.embed_storage_path, 'w') as f:
|
|
json.dump(self.embed_locations, f, indent=2)
|
|
logger.debug("Successfully saved embed locations to disk")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save embed locations: {str(e)}")
|
|
|
|
async def refresh_all_embeds(self) -> Tuple[int, int]:
|
|
"""
|
|
Perform a complete refresh of all server status embeds.
|
|
Creates new embeds and deletes old ones to prevent duplication.
|
|
|
|
Returns:
|
|
Tuple of (deleted_count, created_count) - number of embeds processed
|
|
"""
|
|
logger.info("Starting full refresh of all server embeds")
|
|
async with self.update_lock:
|
|
try:
|
|
await asyncio.sleep(1) # Initial delay
|
|
|
|
# Get current server list if cache is empty
|
|
if not self.server_cache:
|
|
logger.debug("Server cache empty, fetching fresh server list")
|
|
servers = await self.pterodactyl_api.get_servers()
|
|
if not servers:
|
|
logger.warning("No servers found in Pterodactyl panel")
|
|
return 0, 0
|
|
|
|
self.server_cache = {server['attributes']['identifier']: server for server in servers}
|
|
logger.info(f"Populated server cache with {len(servers)} servers")
|
|
|
|
# Create new embeds in temporary storage
|
|
new_embeds = {}
|
|
created_count = 0
|
|
skipped_count = 0
|
|
|
|
for server_id, server_data in self.server_cache.items():
|
|
# Skip if we don't have an existing location to recreate in
|
|
if server_id not in self.embed_locations:
|
|
skipped_count += 1
|
|
continue
|
|
|
|
channel_id = self.embed_locations[server_id]['channel_id']
|
|
channel = self.get_channel(int(channel_id))
|
|
if not channel:
|
|
logger.warning(f"Channel {channel_id} not found for server {server_id}")
|
|
continue
|
|
|
|
try:
|
|
logger.debug(f"Creating new embed for server {server_id}")
|
|
# Get current server status
|
|
resources = await self.pterodactyl_api.get_server_resources(server_id)
|
|
|
|
# Create new embed
|
|
embed, view = await self.get_server_status_embed(server_data, resources)
|
|
message = await channel.send(embed=embed, view=view)
|
|
|
|
# Store in temporary location
|
|
new_embeds[server_id] = {
|
|
'channel_id': str(channel.id),
|
|
'message_id': str(message.id)
|
|
}
|
|
created_count += 1
|
|
logger.info(f"Created new embed for server {server_data['attributes']['name']}")
|
|
|
|
await asyncio.sleep(1) # Rate limit protection
|
|
except Exception as e:
|
|
logger.error(f"Failed to create new embed for server {server_id}: {str(e)}")
|
|
|
|
logger.info(f"Created {created_count} new embeds, skipped {skipped_count} servers")
|
|
|
|
# Only proceed if we created at least one new embed
|
|
if not new_embeds:
|
|
logger.warning("No new embeds created during refresh")
|
|
return 0, 0
|
|
|
|
# Now delete old embeds
|
|
deleted_count = 0
|
|
not_found_count = 0
|
|
|
|
for server_id, location in list(self.embed_locations.items()):
|
|
try:
|
|
channel = self.get_channel(int(location['channel_id']))
|
|
if channel:
|
|
try:
|
|
message = await channel.fetch_message(int(location['message_id']))
|
|
await message.delete()
|
|
deleted_count += 1
|
|
logger.debug(f"Deleted old embed for server {server_id}")
|
|
await asyncio.sleep(0.5) # Rate limit protection
|
|
except discord.NotFound:
|
|
not_found_count += 1
|
|
logger.debug(f"Old embed for server {server_id} already deleted")
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete old embed for server {server_id}: {str(e)}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing old embed for server {server_id}: {str(e)}")
|
|
|
|
logger.info(f"Deleted {deleted_count} old embeds, {not_found_count} already missing")
|
|
|
|
# Update storage with new embed locations
|
|
self.embed_locations = new_embeds
|
|
await self.save_embed_locations()
|
|
|
|
return deleted_count, created_count
|
|
|
|
except Exception as e:
|
|
logger.error(f"Critical error during embed refresh: {str(e)}")
|
|
raise
|
|
|
|
async def track_new_embed(self, server_id: str, message: discord.Message):
|
|
"""
|
|
Track a newly created embed in storage.
|
|
|
|
Args:
|
|
server_id: The server's Pterodactyl identifier
|
|
message: Discord message containing the embed
|
|
"""
|
|
logger.debug(f"Tracking new embed for server {server_id} in channel {message.channel.id}")
|
|
self.embed_locations[server_id] = {
|
|
'channel_id': str(message.channel.id),
|
|
'message_id': str(message.id)
|
|
}
|
|
await self.save_embed_locations()
|
|
|
|
async def get_server_status_embed(self, server_data: dict, resources: dict) -> Tuple[discord.Embed, ServerStatusView]:
|
|
"""
|
|
Create a status embed and view for a server.
|
|
|
|
Args:
|
|
server_data: Server information from Pterodactyl
|
|
resources: Current resource usage data
|
|
|
|
Returns:
|
|
Tuple of (embed, view) objects ready for display
|
|
"""
|
|
attributes = server_data.get('attributes', {})
|
|
identifier = attributes.get('identifier', 'unknown')
|
|
name = attributes.get('name', 'Unknown Server')
|
|
description = attributes.get('description', 'No description available')
|
|
logger.debug(f"Building status embed for server {name} ({identifier})")
|
|
|
|
# Parse resource data
|
|
resource_attributes = resources.get('attributes', {})
|
|
current_state = resource_attributes.get('current_state', 'offline').title()
|
|
is_suspended = attributes.get('suspended', False)
|
|
|
|
# Create embed with appropriate color based on status
|
|
embed = discord.Embed(
|
|
title=f"{name} - {current_state}",
|
|
description=description,
|
|
color=discord.Color.blue() if current_state.lower() == "running" else discord.Color.red(),
|
|
timestamp=datetime.now()
|
|
)
|
|
|
|
embed.add_field(name="Server ID", value=identifier, inline=True)
|
|
|
|
if is_suspended:
|
|
embed.add_field(name="Status", value="⛔ Suspended", inline=True)
|
|
else:
|
|
embed.add_field(name="Status", value="✅ Active", inline=True)
|
|
|
|
# Add resource usage if server is running
|
|
if current_state.lower() == "running":
|
|
cpu_usage = round(resource_attributes.get('resources', {}).get('cpu_absolute', 0), 2)
|
|
memory_usage = round(resource_attributes.get('resources', {}).get('memory_bytes', 0) / (1024 ** 2), 2)
|
|
disk_usage = round(resource_attributes.get('resources', {}).get('disk_bytes', 0) / (1024 ** 2), 2)
|
|
network_rx = round(resource_attributes.get('resources', {}).get('network_rx_bytes', 0) / (1024 ** 2), 2)
|
|
network_tx = round(resource_attributes.get('resources', {}).get('network_tx_bytes', 0) / (1024 ** 2), 2)
|
|
|
|
embed.add_field(name="CPU Usage", value=f"{cpu_usage}%", inline=True)
|
|
embed.add_field(name="Memory Usage", value=f"{memory_usage} MB", inline=True)
|
|
embed.add_field(name="Disk Usage", value=f"{disk_usage} MB", inline=True)
|
|
embed.add_field(name="Network", value=f"⬇️ {network_rx} MB / ⬆️ {network_tx} MB", inline=False)
|
|
|
|
embed.set_footer(text="Last updated")
|
|
|
|
# Create interactive view with control buttons
|
|
view = ServerStatusView(
|
|
server_id=identifier,
|
|
server_name=name,
|
|
pterodactyl_api=self.pterodactyl_api,
|
|
server_data=server_data
|
|
)
|
|
|
|
logger.debug(f"Successfully built status components for {name}")
|
|
return embed, view
|
|
|
|
@tasks.loop(seconds=UPDATE_INTERVAL)
|
|
async def update_status(self):
|
|
"""
|
|
Background task to update server status embeds when:
|
|
1. Server power state changes (started/stopped/restarted)
|
|
2. Significant CPU usage change (>50% difference)
|
|
3. First time seeing the server
|
|
|
|
This minimizes API calls to Discord and updates while maintaining
|
|
real-time awareness of important server changes.
|
|
"""
|
|
logger.info("Starting optimized server status update cycle")
|
|
async with self.update_lock: # Ensure only one update runs at a time
|
|
try:
|
|
# Fetch current server list from Pterodactyl
|
|
servers = await self.pterodactyl_api.get_servers()
|
|
if not servers:
|
|
logger.warning("No servers found in Pterodactyl panel during update")
|
|
return
|
|
|
|
# Update our local cache with fresh server data
|
|
self.server_cache = {server['attributes']['identifier']: server for server in servers}
|
|
logger.debug(f"Updated server cache with {len(servers)} servers")
|
|
|
|
# Variables to track our update statistics
|
|
update_count = 0 # Successful updates
|
|
error_count = 0 # Failed updates
|
|
missing_count = 0 # Missing embeds
|
|
skipped_count = 0 # Servers that didn't need updates
|
|
|
|
# Process each server we're tracking embeds for
|
|
for server_id, location in list(self.embed_locations.items()):
|
|
# Skip if server no longer exists in Pterodactyl
|
|
if server_id not in self.server_cache:
|
|
logger.warning(f"Server {server_id} not found in cache, skipping update")
|
|
continue
|
|
|
|
server_data = self.server_cache[server_id]
|
|
server_name = server_data['attributes']['name']
|
|
|
|
try:
|
|
logger.debug(f"Checking status for server {server_name} ({server_id})")
|
|
|
|
# Get current server resource usage
|
|
resources = await self.pterodactyl_api.get_server_resources(server_id)
|
|
current_state = resources.get('attributes', {}).get('current_state', 'offline')
|
|
cpu_usage = round(resources.get('attributes', {}).get('resources', {}).get('cpu_absolute', 0), 2)
|
|
|
|
# Retrieve previous recorded state and CPU usage
|
|
prev_state, prev_cpu = self.previous_states.get(server_id, (None, 0))
|
|
|
|
# DECISION LOGIC: Should we update the embed?
|
|
needs_update = False
|
|
|
|
# 1. Check if power state changed (most important)
|
|
if current_state != prev_state:
|
|
logger.debug(f"Power state changed for {server_name}: {prev_state} -> {current_state}")
|
|
needs_update = True
|
|
|
|
# 2. Check for significant CPU change (only if server is running)
|
|
elif current_state == 'running' and abs(cpu_usage - prev_cpu) > 50:
|
|
logger.debug(f"Significant CPU change for {server_name}: {prev_cpu}% -> {cpu_usage}%")
|
|
needs_update = True
|
|
|
|
# 3. First time we're seeing this server (initial update)
|
|
elif prev_state is None:
|
|
logger.debug(f"First check for {server_name}, performing initial update")
|
|
needs_update = True
|
|
|
|
# PERFORM UPDATE IF NEEDED
|
|
if needs_update:
|
|
# Generate fresh embed and view components
|
|
embed, view = await self.get_server_status_embed(server_data, resources)
|
|
|
|
# Get the channel where this server's embed lives
|
|
channel = self.get_channel(int(location['channel_id']))
|
|
if not channel:
|
|
logger.warning(f"Channel {location['channel_id']} not found for server {server_id}")
|
|
continue
|
|
|
|
# Fetch and update the existing message
|
|
message = await channel.fetch_message(int(location['message_id']))
|
|
await message.edit(embed=embed, view=view)
|
|
update_count += 1
|
|
logger.debug(f"Updated status for {server_name}")
|
|
|
|
# Update our state tracking with new values
|
|
self.previous_states[server_id] = (current_state, cpu_usage)
|
|
else:
|
|
# No significant changes detected
|
|
skipped_count += 1
|
|
logger.debug(f"No changes detected for {server_name}, skipping update")
|
|
|
|
except discord.NotFound:
|
|
# Embed message was deleted - clean up our tracking
|
|
logger.warning(f"Embed for server {server_id} not found, removing from tracking")
|
|
self.embed_locations.pop(server_id, None)
|
|
self.previous_states.pop(server_id, None) # Also clean up state tracking
|
|
missing_count += 1
|
|
await self.save_embed_locations()
|
|
except Exception as e:
|
|
logger.error(f"Failed to update status for server {server_id}: {str(e)}")
|
|
error_count += 1
|
|
|
|
# Small delay between servers to avoid rate limits
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Log summary of this update cycle
|
|
logger.info(
|
|
f"Update cycle complete: "
|
|
f"{update_count} updated, "
|
|
f"{skipped_count} skipped, "
|
|
f"{missing_count} missing, "
|
|
f"{error_count} errors"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in update_status task: {str(e)}")
|
|
# If something went wrong, wait before retrying
|
|
await asyncio.sleep(5)
|
|
|
|
@update_status.before_loop
|
|
async def before_update_status(self):
|
|
"""Wait for bot to be ready before starting update task."""
|
|
logger.debug("Waiting for bot readiness before starting update task")
|
|
await self.wait_until_ready()
|
|
await self.refresh_all_embeds()
|
|
|
|
@update_status.after_loop
|
|
async def after_update_status(self):
|
|
"""Handle update task stopping."""
|
|
if self.update_status.is_being_cancelled():
|
|
logger.info("Server status update task was cancelled")
|
|
elif self.update_status.failed():
|
|
logger.error("Server status update task failed")
|
|
|
|
async def close(self):
|
|
"""Cleanup when bot is shutting down."""
|
|
logger.info("Bot shutdown initiated - performing cleanup")
|
|
await self.save_embed_locations()
|
|
if self.pterodactyl_api:
|
|
await self.pterodactyl_api.close()
|
|
await super().close()
|
|
|
|
# ==============================================
|
|
# DISCORD COMMANDS
|
|
# ==============================================
|
|
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
|
|
bot = PterodactylBot(command_prefix="!", intents=intents)
|
|
|
|
async def check_allowed_guild(interaction: discord.Interaction) -> bool:
|
|
"""
|
|
Verify that an interaction is coming from the allowed guild.
|
|
|
|
Args:
|
|
interaction: Discord interaction object
|
|
|
|
Returns:
|
|
bool: True if interaction is allowed, False otherwise
|
|
"""
|
|
if interaction.guild_id != ALLOWED_GUILD_ID:
|
|
logger.warning(f"Command attempted from unauthorized guild {interaction.guild_id} by {interaction.user.name}")
|
|
await interaction.response.send_message(
|
|
"This bot is only available in a specific server.",
|
|
ephemeral=True
|
|
)
|
|
return False
|
|
return True
|
|
|
|
@bot.tree.command(name="server_status", description="Show the status of a game server")
|
|
@app_commands.describe(
|
|
server_name="The name of the server to check",
|
|
channel="The channel to post the status in (defaults to current channel)"
|
|
)
|
|
async def server_status(
|
|
interaction: discord.Interaction,
|
|
server_name: str,
|
|
channel: Optional[discord.TextChannel] = None
|
|
):
|
|
"""
|
|
Slash command to display a server's status embed.
|
|
Creates or updates the status message for the specified server.
|
|
"""
|
|
# Check if interaction is from allowed guild
|
|
if not await check_allowed_guild(interaction):
|
|
return
|
|
|
|
logger.info(f"Server status command invoked by {interaction.user.name} for '{server_name}'")
|
|
await interaction.response.defer()
|
|
|
|
# Use specified channel or current channel
|
|
target_channel = channel or interaction.channel
|
|
logger.debug(f"Target channel: {target_channel.name} ({target_channel.id})")
|
|
|
|
# Find the server by name in cache
|
|
server_data = None
|
|
server_id = None
|
|
|
|
logger.debug(f"Searching cache for server '{server_name}'")
|
|
for s_id, s_data in bot.server_cache.items():
|
|
if s_data['attributes']['name'].lower() == server_name.lower():
|
|
server_data = s_data
|
|
server_id = s_id
|
|
break
|
|
|
|
if not server_data:
|
|
logger.warning(f"Server '{server_name}' not found in cache")
|
|
await interaction.followup.send(f"Server '{server_name}' not found.", ephemeral=True)
|
|
return
|
|
|
|
logger.info(f"Found server {server_name} ({server_id}), fetching resources")
|
|
|
|
# Get current server status
|
|
resources = await bot.pterodactyl_api.get_server_resources(server_id)
|
|
|
|
# Delete old embed if it exists
|
|
if server_id in bot.embed_locations:
|
|
logger.debug(f"Found existing embed for {server_id}, attempting to delete")
|
|
try:
|
|
old_location = bot.embed_locations[server_id]
|
|
old_channel = bot.get_channel(int(old_location['channel_id']))
|
|
if old_channel:
|
|
try:
|
|
old_message = await old_channel.fetch_message(int(old_location['message_id']))
|
|
await old_message.delete()
|
|
logger.debug(f"Deleted old embed for {server_id}")
|
|
except discord.NotFound:
|
|
logger.debug(f"Old embed for {server_id} already deleted")
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete old embed: {str(e)}")
|
|
|
|
# Create and send new embed
|
|
logger.info(f"Creating new embed for {server_name} in {target_channel.name}")
|
|
embed, view = await bot.get_server_status_embed(server_data, resources)
|
|
message = await target_channel.send(embed=embed, view=view)
|
|
await bot.track_new_embed(server_id, message)
|
|
|
|
await interaction.followup.send(
|
|
f"Server status posted in {target_channel.mention}",
|
|
ephemeral=True
|
|
)
|
|
logger.info(f"Successfully posted status for {server_name}")
|
|
|
|
@bot.tree.command(name="refresh_embeds", description="Refresh all server status embeds (admin only)")
|
|
async def refresh_embeds(interaction: discord.Interaction):
|
|
"""Slash command to refresh all server embeds."""
|
|
if not await check_allowed_guild(interaction):
|
|
return
|
|
|
|
logger.info(f"Refresh embeds command invoked by {interaction.user.name}")
|
|
await interaction.response.defer(ephemeral=True)
|
|
|
|
# Require administrator permissions
|
|
if not interaction.user.guild_permissions.administrator:
|
|
logger.warning(f"Unauthorized refresh attempt by {interaction.user.name}")
|
|
await interaction.followup.send(
|
|
"You need administrator permissions to refresh all embeds.",
|
|
ephemeral=True
|
|
)
|
|
return
|
|
|
|
try:
|
|
logger.info("Starting full embed refresh per admin request")
|
|
deleted, created = await bot.refresh_all_embeds()
|
|
await interaction.followup.send(
|
|
f"Refreshed all embeds. Deleted {deleted} old embeds, created {created} new ones.",
|
|
ephemeral=True
|
|
)
|
|
logger.info(f"Embed refresh completed: {deleted} deleted, {created} created")
|
|
except Exception as e:
|
|
logger.error(f"Embed refresh failed: {str(e)}")
|
|
await interaction.followup.send(
|
|
f"Failed to refresh embeds: {str(e)}",
|
|
ephemeral=True
|
|
)
|
|
|
|
# ==============================================
|
|
# BOT EVENTS
|
|
# ==============================================
|
|
|
|
@bot.event
|
|
async def on_interaction(interaction: discord.Interaction):
|
|
"""Global interaction handler to check guild before processing any interaction."""
|
|
if interaction.guild_id != ALLOWED_GUILD_ID:
|
|
logger.debug(f"Ignoring interaction from unauthorized guild {interaction.guild_id}")
|
|
await interaction.response.send_message(
|
|
"This bot is only available in a specific server.",
|
|
ephemeral=True
|
|
)
|
|
return
|
|
|
|
@bot.event
|
|
async def on_ready():
|
|
"""Called when the bot successfully connects to Discord."""
|
|
logger.info(f"Bot connected as {bot.user.name} (ID: {bot.user.id})")
|
|
|
|
try:
|
|
# Sync commands only to the allowed guild
|
|
guild = discord.Object(id=ALLOWED_GUILD_ID)
|
|
bot.tree.copy_global_to(guild=guild)
|
|
synced = await bot.tree.sync(guild=guild)
|
|
logger.info(f"Successfully synced {len(synced)} command(s) to guild {ALLOWED_GUILD_ID}: {[cmd.name for cmd in synced]}")
|
|
except Exception as e:
|
|
logger.error(f"Command sync failed: {str(e)}")
|
|
|
|
# ==============================================
|
|
# SYSTEM SIGNAL HANDLERS
|
|
# ==============================================
|
|
|
|
def handle_sigint(signum: int, frame: types.FrameType) -> None:
|
|
"""
|
|
Handle SIGINT signals (Ctrl+C) by initiating graceful shutdown.
|
|
|
|
Args:
|
|
signum: The signal number (signal.SIGINT)
|
|
frame: Current stack frame (unused but required by signal handler signature)
|
|
"""
|
|
logger.info("Received SIGINT (Ctrl+C), initiating graceful shutdown...")
|
|
raise KeyboardInterrupt
|
|
|
|
def handle_sigterm(signum: int, frame: types.FrameType) -> None:
|
|
"""
|
|
Handle SIGTERM signals (container stop) by initiating graceful shutdown.
|
|
|
|
Args:
|
|
signum: The signal number (signal.SIGTERM)
|
|
frame: Current stack frame (unused but required by signal handler signature)
|
|
"""
|
|
logger.info("Received SIGTERM (container stop), initiating graceful shutdown...")
|
|
raise KeyboardInterrupt
|
|
|
|
# ==============================================
|
|
# BOT STARTUP
|
|
# ==============================================
|
|
|
|
if __name__ == "__main__":
|
|
"""
|
|
Main entry point for the bot application.
|
|
|
|
Handles:
|
|
- Signal registration for graceful shutdowns (SIGINT/SIGTERM)
|
|
- Primary bot execution loop
|
|
- Error handling and crash reporting
|
|
- Resource cleanup on shutdown
|
|
|
|
Flow:
|
|
1. Initialize signal handlers
|
|
2. Start bot with Discord token
|
|
3. Handle interrupts and exceptions
|
|
4. Execute final cleanup
|
|
"""
|
|
logger.info("Starting bot initialization")
|
|
|
|
# Register signal handlers
|
|
signal.signal(signal.SIGINT, handle_sigint) # For Ctrl+C
|
|
signal.signal(signal.SIGTERM, handle_sigterm) # For container stop commands
|
|
logger.info("System signal handlers registered")
|
|
|
|
try:
|
|
bot.run(DISCORD_TOKEN)
|
|
except KeyboardInterrupt:
|
|
logger.info("Bot shutting down")
|
|
except Exception as e:
|
|
logger.error(f"Bot crashed: {str(e)}")
|
|
sys.exit(1) # Exit with error code for crash
|
|
finally:
|
|
logger.info("Bot shutdown complete")
|
|
sys.exit(0) # Explicit clean exit
|