Files
pterodactyl-discord-bot/server_metrics_graphs.py
Eaven Kimura ce77639a47
All checks were successful
Docker Build and Push (Multi-architecture) / build-and-push (push) Successful in 33s
Add: Dynamic graph scaling for multi vCPU
2025-09-29 04:07:33 +00:00

472 lines
19 KiB
Python

"""
Server Metrics Graphs Module for Pterodactyl Discord Bot
This module provides graphing capabilities for server CPU and memory usage.
Generates line graphs as PNG images for embedding in Discord messages.
"""
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend for server environments
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from collections import deque
from datetime import datetime, timedelta
from typing import Dict, Tuple, Optional
import io
import logging
import math
# Get the logger from the main bot module
logger = logging.getLogger('pterodisbot')
class ServerMetricsGraphs:
"""
Manages CPU and memory usage graphs for individual servers.
Features:
- Stores last 6 data points (1 minute of history at 10-second intervals)
- Generates PNG images of line graphs for Discord embedding
- Automatic data rotation (FIFO queue with max 6 points)
- Separate tracking for CPU percentage and memory MB usage
- Dynamic CPU scaling in 100% increments for multi-vCPU servers
- Clean graph styling optimized for Discord dark theme
"""
def __init__(self, server_id: str, server_name: str):
"""
Initialize metrics tracking for a server.
Args:
server_id: Pterodactyl server identifier
server_name: Human-readable server name
"""
self.server_id = server_id
self.server_name = server_name
# Use deque with maxlen=6 for automatic FIFO rotation
# Each entry is a tuple: (timestamp, cpu_percent, memory_mb)
self.data_points = deque(maxlen=6)
# Track if we have enough data for meaningful graphs (at least 2 points)
self.has_sufficient_data = False
logger.debug(f"Initialized metrics tracking for server {server_name} ({server_id})")
def add_data_point(self, cpu_percent: float, memory_mb: float, timestamp: Optional[datetime] = None):
"""
Add a new data point to the metrics history.
Args:
cpu_percent: Current CPU usage percentage
memory_mb: Current memory usage in megabytes
timestamp: Optional timestamp, defaults to current time
"""
if timestamp is None:
timestamp = datetime.now()
# Add new data point (automatically rotates old data due to maxlen=6)
self.data_points.append((timestamp, cpu_percent, memory_mb))
# Update sufficient data flag
self.has_sufficient_data = len(self.data_points) >= 2
logger.debug(f"Added metrics data point for {self.server_name}: CPU={cpu_percent}%, Memory={memory_mb}MB")
def _calculate_cpu_scale_limit(self, max_cpu_value: float) -> int:
"""
Calculate appropriate CPU scale limit in 100% increments.
Args:
max_cpu_value: Maximum CPU value in the dataset
Returns:
Scale limit rounded up to nearest 100% increment
"""
if max_cpu_value <= 100:
return 100
# Round up to nearest 100% increment
# e.g., 150% -> 200%, 250% -> 300%, 350% -> 400%
return math.ceil(max_cpu_value / 100) * 100
def generate_cpu_graph(self) -> Optional[io.BytesIO]:
"""
Generate a CPU usage line graph as a PNG image.
Returns:
BytesIO object containing PNG image data, or None if insufficient data
"""
if not self.has_sufficient_data:
logger.debug(f"Insufficient data for CPU graph generation: {self.server_name}")
return None
try:
# Extract timestamps and CPU data
timestamps = [point[0] for point in self.data_points]
cpu_values = [point[1] for point in self.data_points]
# Calculate dynamic CPU scale limit
max_cpu = max(cpu_values)
cpu_scale_limit = self._calculate_cpu_scale_limit(max_cpu)
# Create figure with dark theme styling
plt.style.use('dark_background')
fig, ax = plt.subplots(figsize=(8, 4), dpi=100)
fig.patch.set_facecolor('#2f3136') # Discord dark theme background
ax.set_facecolor('#36393f') # Slightly lighter for graph area
# Plot CPU line with gradient fill
line = ax.plot(timestamps, cpu_values, color='#7289da', linewidth=2.5, marker='o', markersize=4)
ax.fill_between(timestamps, cpu_values, alpha=0.3, color='#7289da')
# Customize axes with dynamic scaling
ax.set_ylabel('CPU Usage (%)', color='#ffffff', fontsize=10)
ax.set_ylim(0, cpu_scale_limit)
# Add horizontal grid lines at 100% increments for better readability
for i in range(100, cpu_scale_limit + 1, 100):
ax.axhline(y=i, color='#ffffff', alpha=0.2, linestyle='--', linewidth=0.8)
# Format time axis
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
ax.xaxis.set_major_locator(mdates.SecondLocator(interval=20))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right', color='#ffffff', fontsize=8)
# Style the graph
ax.tick_params(colors='#ffffff', labelsize=8)
ax.grid(True, alpha=0.3, color='#ffffff')
ax.spines['bottom'].set_color('#ffffff')
ax.spines['left'].set_color('#ffffff')
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
# Add title with scale info for multi-vCPU servers
title = f'{self.server_name} - CPU Usage'
if cpu_scale_limit > 100:
estimated_vcpus = cpu_scale_limit // 100
title += f' (~{estimated_vcpus} vCPU cores)'
ax.set_title(title, color='#ffffff', fontsize=12, pad=20)
# Tight layout to prevent label cutoff
plt.tight_layout()
# Save to BytesIO
img_buffer = io.BytesIO()
plt.savefig(img_buffer, format='png', facecolor='#2f3136', edgecolor='none',
bbox_inches='tight', dpi=100)
img_buffer.seek(0)
# Clean up matplotlib resources
plt.close(fig)
logger.debug(f"Generated CPU graph for {self.server_name} (scale: 0-{cpu_scale_limit}%)")
return img_buffer
except Exception as e:
logger.error(f"Failed to generate CPU graph for {self.server_name}: {str(e)}")
plt.close('all') # Clean up any remaining figures
return None
def generate_memory_graph(self) -> Optional[io.BytesIO]:
"""
Generate a memory usage line graph as a PNG image.
Returns:
BytesIO object containing PNG image data, or None if insufficient data
"""
if not self.has_sufficient_data:
logger.debug(f"Insufficient data for memory graph generation: {self.server_name}")
return None
try:
# Extract timestamps and memory data
timestamps = [point[0] for point in self.data_points]
memory_values = [point[2] for point in self.data_points]
# Create figure with dark theme styling
plt.style.use('dark_background')
fig, ax = plt.subplots(figsize=(8, 4), dpi=100)
fig.patch.set_facecolor('#2f3136') # Discord dark theme background
ax.set_facecolor('#36393f') # Slightly lighter for graph area
# Plot memory line with gradient fill
line = ax.plot(timestamps, memory_values, color='#43b581', linewidth=2.5, marker='o', markersize=4)
ax.fill_between(timestamps, memory_values, alpha=0.3, color='#43b581')
# Customize axes
ax.set_ylabel('Memory Usage (MB)', color='#ffffff', fontsize=10)
ax.set_ylim(0, max(memory_values) * 1.1) # Dynamic scaling with 10% padding
# Format time axis
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
ax.xaxis.set_major_locator(mdates.SecondLocator(interval=20))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right', color='#ffffff', fontsize=8)
# Style the graph
ax.tick_params(colors='#ffffff', labelsize=8)
ax.grid(True, alpha=0.3, color='#ffffff')
ax.spines['bottom'].set_color('#ffffff')
ax.spines['left'].set_color('#ffffff')
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
# Add title
ax.set_title(f'{self.server_name} - Memory Usage', color='#ffffff', fontsize=12, pad=20)
# Tight layout to prevent label cutoff
plt.tight_layout()
# Save to BytesIO
img_buffer = io.BytesIO()
plt.savefig(img_buffer, format='png', facecolor='#2f3136', edgecolor='none',
bbox_inches='tight', dpi=100)
img_buffer.seek(0)
# Clean up matplotlib resources
plt.close(fig)
logger.debug(f"Generated memory graph for {self.server_name}")
return img_buffer
except Exception as e:
logger.error(f"Failed to generate memory graph for {self.server_name}: {str(e)}")
plt.close('all') # Clean up any remaining figures
return None
def generate_combined_graph(self) -> Optional[io.BytesIO]:
"""
Generate a combined CPU and memory usage graph as a PNG image.
Returns:
BytesIO object containing PNG image data, or None if insufficient data
"""
if not self.has_sufficient_data:
logger.debug(f"Insufficient data for combined graph generation: {self.server_name}")
return None
try:
# Extract data
timestamps = [point[0] for point in self.data_points]
cpu_values = [point[1] for point in self.data_points]
memory_values = [point[2] for point in self.data_points]
# Calculate dynamic CPU scale limit
max_cpu = max(cpu_values)
cpu_scale_limit = self._calculate_cpu_scale_limit(max_cpu)
# Create figure with two subplots
plt.style.use('dark_background')
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 6), dpi=100, sharex=True)
fig.patch.set_facecolor('#2f3136')
# CPU subplot
ax1.set_facecolor('#36393f')
ax1.plot(timestamps, cpu_values, color='#7289da', linewidth=2.5, marker='o', markersize=4)
ax1.fill_between(timestamps, cpu_values, alpha=0.3, color='#7289da')
ax1.set_ylabel('CPU Usage (%)', color='#ffffff', fontsize=10)
ax1.set_ylim(0, cpu_scale_limit)
ax1.tick_params(colors='#ffffff', labelsize=8)
ax1.grid(True, alpha=0.3, color='#ffffff')
# Add horizontal grid lines at 100% increments for CPU subplot
for i in range(100, cpu_scale_limit + 1, 100):
ax1.axhline(y=i, color='#ffffff', alpha=0.2, linestyle='--', linewidth=0.8)
# Title with vCPU info if applicable
title = f'{self.server_name} - Resource Usage'
if cpu_scale_limit > 100:
estimated_vcpus = cpu_scale_limit // 100
title += f' (~{estimated_vcpus} vCPU cores)'
ax1.set_title(title, color='#ffffff', fontsize=12)
# Memory subplot
ax2.set_facecolor('#36393f')
ax2.plot(timestamps, memory_values, color='#43b581', linewidth=2.5, marker='o', markersize=4)
ax2.fill_between(timestamps, memory_values, alpha=0.3, color='#43b581')
ax2.set_ylabel('Memory (MB)', color='#ffffff', fontsize=10)
ax2.set_ylim(0, max(memory_values) * 1.1)
ax2.tick_params(colors='#ffffff', labelsize=8)
ax2.grid(True, alpha=0.3, color='#ffffff')
# Format time axis (only on bottom subplot)
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
ax2.xaxis.set_major_locator(mdates.SecondLocator(interval=20))
plt.setp(ax2.xaxis.get_majorticklabels(), rotation=45, ha='right', color='#ffffff', fontsize=8)
# Style both subplots
for ax in [ax1, ax2]:
ax.spines['bottom'].set_color('#ffffff')
ax.spines['left'].set_color('#ffffff')
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
plt.tight_layout()
# Save to BytesIO
img_buffer = io.BytesIO()
plt.savefig(img_buffer, format='png', facecolor='#2f3136', edgecolor='none',
bbox_inches='tight', dpi=100)
img_buffer.seek(0)
plt.close(fig)
logger.debug(f"Generated combined graph for {self.server_name} (CPU scale: 0-{cpu_scale_limit}%)")
return img_buffer
except Exception as e:
logger.error(f"Failed to generate combined graph for {self.server_name}: {str(e)}")
plt.close('all')
return None
def get_data_summary(self) -> Dict[str, any]:
"""
Get summary statistics for the current data points.
Returns:
Dictionary containing data point count, latest values, and trends
"""
if not self.data_points:
return {
'point_count': 0,
'has_data': False,
'latest_cpu': 0,
'latest_memory': 0
}
# Get latest values
latest_point = self.data_points[-1]
latest_cpu = latest_point[1]
latest_memory = latest_point[2]
# Calculate CPU scale info
max_cpu = max(point[1] for point in self.data_points)
cpu_scale_limit = self._calculate_cpu_scale_limit(max_cpu)
estimated_vcpus = cpu_scale_limit // 100
# Calculate trends if we have multiple points
cpu_trend = 'stable'
memory_trend = 'stable'
if len(self.data_points) >= 2:
first_point = self.data_points[0]
cpu_change = latest_cpu - first_point[1]
memory_change = latest_memory - first_point[2]
# Determine trends (>5% change considered significant)
if abs(cpu_change) > 5:
cpu_trend = 'increasing' if cpu_change > 0 else 'decreasing'
if abs(memory_change) > 50: # 50MB change threshold
memory_trend = 'increasing' if memory_change > 0 else 'decreasing'
return {
'point_count': len(self.data_points),
'has_data': self.has_sufficient_data,
'latest_cpu': latest_cpu,
'latest_memory': latest_memory,
'cpu_trend': cpu_trend,
'memory_trend': memory_trend,
'cpu_scale_limit': cpu_scale_limit,
'estimated_vcpus': estimated_vcpus,
'time_span_minutes': len(self.data_points) * 10 / 60 # Convert to minutes
}
class ServerMetricsManager:
"""
Global manager for all server metrics graphs.
Handles:
- Creation and cleanup of ServerMetricsGraphs instances
- Bulk operations across all tracked servers
- Memory management for graph storage
"""
def __init__(self):
"""Initialize the metrics manager."""
self.server_graphs: Dict[str, ServerMetricsGraphs] = {}
logger.info("Initialized ServerMetricsManager")
def get_or_create_server_graphs(self, server_id: str, server_name: str) -> ServerMetricsGraphs:
"""
Get existing ServerMetricsGraphs instance or create a new one.
Args:
server_id: Pterodactyl server identifier
server_name: Human-readable server name
Returns:
ServerMetricsGraphs instance for the specified server
"""
if server_id not in self.server_graphs:
self.server_graphs[server_id] = ServerMetricsGraphs(server_id, server_name)
logger.debug(f"Created new metrics graphs for server {server_name}")
return self.server_graphs[server_id]
def add_server_data(self, server_id: str, server_name: str, cpu_percent: float, memory_mb: float):
"""
Add data point to a server's metrics tracking.
Args:
server_id: Pterodactyl server identifier
server_name: Human-readable server name
cpu_percent: Current CPU usage percentage
memory_mb: Current memory usage in megabytes
"""
graphs = self.get_or_create_server_graphs(server_id, server_name)
graphs.add_data_point(cpu_percent, memory_mb)
def remove_server(self, server_id: str):
"""
Remove a server from metrics tracking.
Args:
server_id: Pterodactyl server identifier to remove
"""
if server_id in self.server_graphs:
del self.server_graphs[server_id]
logger.debug(f"Removed metrics tracking for server {server_id}")
def get_server_graphs(self, server_id: str) -> Optional[ServerMetricsGraphs]:
"""
Get ServerMetricsGraphs instance for a specific server.
Args:
server_id: Pterodactyl server identifier
Returns:
ServerMetricsGraphs instance or None if not found
"""
return self.server_graphs.get(server_id)
def cleanup_old_servers(self, active_server_ids: list):
"""
Remove tracking for servers that no longer exist.
Args:
active_server_ids: List of currently active server IDs
"""
servers_to_remove = []
for server_id in self.server_graphs:
if server_id not in active_server_ids:
servers_to_remove.append(server_id)
for server_id in servers_to_remove:
self.remove_server(server_id)
if servers_to_remove:
logger.info(f"Cleaned up metrics for {len(servers_to_remove)} inactive servers")
def get_summary(self) -> Dict[str, any]:
"""
Get summary of all tracked servers.
Returns:
Dictionary with tracking statistics
"""
return {
'total_servers': len(self.server_graphs),
'servers_with_data': sum(1 for graphs in self.server_graphs.values() if graphs.has_sufficient_data),
'total_data_points': sum(len(graphs.data_points) for graphs in self.server_graphs.values())
}